Advanced Pipeline Architectures

Building on the fundamentals, we can now explore more sophisticated pipeline architectures that address complex data processing requirements.

Fan-Out/Fan-In Pattern

The fan-out/fan-in pattern distributes work across multiple goroutines and then consolidates the results, enabling parallel processing of independent data items.

package main

import (
	"fmt"
	"sync"
	"time"
)

// generator produces integers from 1 to n
func generator(n int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for i := 1; i <= n; i++ {
			out <- i
		}
	}()
	return out
}

// processor is a stage that simulates CPU-intensive work
func processor(id int, in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for n := range in {
			// Simulate CPU-intensive work
			time.Sleep(100 * time.Millisecond)
			result := n * n
			fmt.Printf("Processor %d: %d² = %d\n", id, n, result)
			out <- result
		}
	}()
	return out
}

// fanOut creates multiple processor stages that read from the same input channel
func fanOut(in <-chan int, numProcessors int) []<-chan int {
	outputs := make([]<-chan int, numProcessors)
	for i := 0; i < numProcessors; i++ {
		outputs[i] = processor(i+1, in)
	}
	return outputs
}

// fanIn combines multiple input channels into a single output channel
func fanIn(inputs []<-chan int) <-chan int {
	out := make(chan int)
	var wg sync.WaitGroup
	
	// Start a goroutine for each input channel
	for _, ch := range inputs {
		wg.Add(1)
		go func(ch <-chan int) {
			defer wg.Done()
			for n := range ch {
				out <- n
			}
		}(ch)
	}
	
	// Start a goroutine to close the output channel when all input channels are done
	go func() {
		wg.Wait()
		close(out)
	}()
	
	return out
}

func main() {
	startTime := time.Now()
	
	// Create the pipeline
	input := generator(10)
	processors := fanOut(input, 4)
	output := fanIn(processors)
	
	// Collect and sum the results
	sum := 0
	for n := range output {
		sum += n
	}
	
	fmt.Printf("Sum: %d\n", sum)
	fmt.Printf("Time taken: %v\n", time.Since(startTime))
}

This pattern is particularly useful for CPU-bound tasks where parallel processing can significantly improve throughput. The key components are:

  1. Fan-Out: Distribute work from a single source to multiple workers
  2. Parallel Processing: Each worker processes items independently
  3. Fan-In: Consolidate results from multiple workers into a single stream

Dynamic Pipeline Construction

In real-world applications, pipeline topologies may need to be constructed dynamically based on configuration or runtime conditions. Here’s a pattern for building pipelines dynamically:

package main

import (
	"fmt"
	"strings"
)

// PipelineStage represents a single stage in a pipeline
type PipelineStage func(<-chan string) <-chan string

// Pipeline represents a series of stages
type Pipeline struct {
	stages []PipelineStage
}

// NewPipeline creates a new empty pipeline
func NewPipeline() *Pipeline {
	return &Pipeline{stages: []PipelineStage{}}
}

// Add appends a stage to the pipeline
func (p *Pipeline) Add(stage PipelineStage) *Pipeline {
	p.stages = append(p.stages, stage)
	return p
}

// Run executes the pipeline with the given input
func (p *Pipeline) Run(input <-chan string) <-chan string {
	current := input
	for _, stage := range p.stages {
		current = stage(current)
	}
	return current
}

// Example pipeline stages
func toUpper(in <-chan string) <-chan string {
	out := make(chan string)
	go func() {
		defer close(out)
		for s := range in {
			out <- strings.ToUpper(s)
		}
	}()
	return out
}

func addPrefix(prefix string) PipelineStage {
	return func(in <-chan string) <-chan string {
		out := make(chan string)
		go func() {
			defer close(out)
			for s := range in {
				out <- prefix + s
			}
		}()
		return out
	}
}

func addSuffix(suffix string) PipelineStage {
	return func(in <-chan string) <-chan string {
		out := make(chan string)
		go func() {
			defer close(out)
			for s := range in {
				out <- s + suffix
			}
		}()
		return out
	}
}

func filter(predicate func(string) bool) PipelineStage {
	return func(in <-chan string) <-chan string {
		out := make(chan string)
		go func() {
			defer close(out)
			for s := range in {
				if predicate(s) {
					out <- s
				}
			}
		}()
		return out
	}
}

func main() {
	// Create input channel
	input := make(chan string)
	
	// Build pipeline dynamically
	pipeline := NewPipeline()
	pipeline.Add(toUpper)
	pipeline.Add(addPrefix(">> "))
	pipeline.Add(filter(func(s string) bool {
		return len(s) > 5
	}))
	pipeline.Add(addSuffix(" <<"))
	
	// Run the pipeline
	output := pipeline.Run(input)
	
	// Feed input in a separate goroutine
	go func() {
		defer close(input)
		words := []string{"hello", "world", "pipeline", "go", "channels"}
		for _, word := range words {
			input <- word
		}
	}()
	
	// Collect results
	for result := range output {
		fmt.Println(result)
	}
}

This pattern provides several benefits:

  1. Composability: Pipeline stages can be combined in different ways
  2. Reusability: Stages can be reused across different pipelines
  3. Configurability: Pipelines can be constructed based on configuration
  4. Testability: Individual stages can be tested in isolation

Bidirectional Pipelines

Sometimes pipelines need to support bidirectional communication, where downstream stages can send feedback or control signals to upstream stages:

package main

import (
	"fmt"
	"time"
)

// Request represents a work item with a response channel
type Request struct {
	Data     int
	Response chan<- int
}

// Worker processes requests and sends responses back
func worker(requests <-chan Request) {
	for req := range requests {
		// Simulate processing
		time.Sleep(100 * time.Millisecond)
		result := req.Data * req.Data
		
		// Send response back through the response channel
		req.Response <- result
		close(req.Response) // Signal that no more responses will be sent
	}
}

func main() {
	// Create request channel
	requests := make(chan Request)
	
	// Start worker
	go worker(requests)
	
	// Send requests and receive responses
	for i := 1; i <= 5; i++ {
		// Create response channel for this request
		respCh := make(chan int)
		
		// Send request with response channel
		requests <- Request{
			Data:     i,
			Response: respCh,
		}
		
		// Wait for response
		resp := <-respCh
		fmt.Printf("Request: %d, Response: %d\n", i, resp)
	}
	
	close(requests)
}

This pattern enables:

  1. Request-Response Communication: Each request can receive a dedicated response
  2. Feedback Loops: Downstream stages can provide feedback to upstream stages
  3. Dynamic Flow Control: Processing can adapt based on feedback

Multi-Stage Pipeline with Context Cancellation

For long-running pipelines, proper cancellation support is essential. Here’s a pattern that integrates context cancellation across multiple stages:

package main

import (
	"context"
	"fmt"
	"time"
)

// generator produces integers until cancelled
func generator(ctx context.Context) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		n := 1
		for {
			select {
			case <-ctx.Done():
				fmt.Println("Generator cancelled")
				return
			case out <- n:
				n++
				time.Sleep(100 * time.Millisecond)
			}
		}
	}()
	return out
}

// processor squares numbers and handles cancellation
func processor(ctx context.Context, in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for {
			select {
			case <-ctx.Done():
				fmt.Println("Processor cancelled")
				return
			case n, ok := <-in:
				if !ok {
					return
				}
				select {
				case <-ctx.Done():
					fmt.Println("Processor cancelled")
					return
				case out <- n * n:
					// Value sent successfully
				}
			}
		}
	}()
	return out
}

// filter only passes even numbers and handles cancellation
func filter(ctx context.Context, in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for {
			select {
			case <-ctx.Done():
				fmt.Println("Filter cancelled")
				return
			case n, ok := <-in:
				if !ok {
					return
				}
				if n%2 == 0 {
					select {
					case <-ctx.Done():
						fmt.Println("Filter cancelled")
						return
					case out <- n:
						// Value sent successfully
					}
				}
			}
		}
	}()
	return out
}

func main() {
	// Create a context with cancellation
	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
	defer cancel()
	
	// Build and run the pipeline
	gen := generator(ctx)
	proc := processor(ctx, gen)
	filt := filter(ctx, proc)
	
	// Consume the results
	for n := range filt {
		fmt.Println("Result:", n)
	}
	
	fmt.Println("Pipeline completed")
}

This pattern ensures:

  1. Graceful Termination: All stages can clean up resources when cancelled
  2. Propagation of Cancellation: Cancellation signals propagate through the entire pipeline
  3. Timeout Support: Pipelines can automatically terminate after a specified duration
  4. Resource Management: Resources are properly released when the pipeline terminates

These advanced pipeline architectures provide powerful tools for building sophisticated data processing systems. In the next section, we’ll explore specific patterns for stream processing.