Advanced Pipeline Architectures
Building on the fundamentals, we can now explore more sophisticated pipeline architectures that address complex data processing requirements.
Fan-Out/Fan-In Pattern
The fan-out/fan-in pattern distributes work across multiple goroutines and then consolidates the results, enabling parallel processing of independent data items.
package main
import (
"fmt"
"sync"
"time"
)
// generator produces integers from 1 to n
func generator(n int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for i := 1; i <= n; i++ {
out <- i
}
}()
return out
}
// processor is a stage that simulates CPU-intensive work
func processor(id int, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
// Simulate CPU-intensive work
time.Sleep(100 * time.Millisecond)
result := n * n
fmt.Printf("Processor %d: %d² = %d\n", id, n, result)
out <- result
}
}()
return out
}
// fanOut creates multiple processor stages that read from the same input channel
func fanOut(in <-chan int, numProcessors int) []<-chan int {
outputs := make([]<-chan int, numProcessors)
for i := 0; i < numProcessors; i++ {
outputs[i] = processor(i+1, in)
}
return outputs
}
// fanIn combines multiple input channels into a single output channel
func fanIn(inputs []<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
// Start a goroutine for each input channel
for _, ch := range inputs {
wg.Add(1)
go func(ch <-chan int) {
defer wg.Done()
for n := range ch {
out <- n
}
}(ch)
}
// Start a goroutine to close the output channel when all input channels are done
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
startTime := time.Now()
// Create the pipeline
input := generator(10)
processors := fanOut(input, 4)
output := fanIn(processors)
// Collect and sum the results
sum := 0
for n := range output {
sum += n
}
fmt.Printf("Sum: %d\n", sum)
fmt.Printf("Time taken: %v\n", time.Since(startTime))
}
This pattern is particularly useful for CPU-bound tasks where parallel processing can significantly improve throughput. The key components are:
- Fan-Out: Distribute work from a single source to multiple workers
- Parallel Processing: Each worker processes items independently
- Fan-In: Consolidate results from multiple workers into a single stream
Dynamic Pipeline Construction
In real-world applications, pipeline topologies may need to be constructed dynamically based on configuration or runtime conditions. Here’s a pattern for building pipelines dynamically:
package main
import (
"fmt"
"strings"
)
// PipelineStage represents a single stage in a pipeline
type PipelineStage func(<-chan string) <-chan string
// Pipeline represents a series of stages
type Pipeline struct {
stages []PipelineStage
}
// NewPipeline creates a new empty pipeline
func NewPipeline() *Pipeline {
return &Pipeline{stages: []PipelineStage{}}
}
// Add appends a stage to the pipeline
func (p *Pipeline) Add(stage PipelineStage) *Pipeline {
p.stages = append(p.stages, stage)
return p
}
// Run executes the pipeline with the given input
func (p *Pipeline) Run(input <-chan string) <-chan string {
current := input
for _, stage := range p.stages {
current = stage(current)
}
return current
}
// Example pipeline stages
func toUpper(in <-chan string) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for s := range in {
out <- strings.ToUpper(s)
}
}()
return out
}
func addPrefix(prefix string) PipelineStage {
return func(in <-chan string) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for s := range in {
out <- prefix + s
}
}()
return out
}
}
func addSuffix(suffix string) PipelineStage {
return func(in <-chan string) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for s := range in {
out <- s + suffix
}
}()
return out
}
}
func filter(predicate func(string) bool) PipelineStage {
return func(in <-chan string) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for s := range in {
if predicate(s) {
out <- s
}
}
}()
return out
}
}
func main() {
// Create input channel
input := make(chan string)
// Build pipeline dynamically
pipeline := NewPipeline()
pipeline.Add(toUpper)
pipeline.Add(addPrefix(">> "))
pipeline.Add(filter(func(s string) bool {
return len(s) > 5
}))
pipeline.Add(addSuffix(" <<"))
// Run the pipeline
output := pipeline.Run(input)
// Feed input in a separate goroutine
go func() {
defer close(input)
words := []string{"hello", "world", "pipeline", "go", "channels"}
for _, word := range words {
input <- word
}
}()
// Collect results
for result := range output {
fmt.Println(result)
}
}
This pattern provides several benefits:
- Composability: Pipeline stages can be combined in different ways
- Reusability: Stages can be reused across different pipelines
- Configurability: Pipelines can be constructed based on configuration
- Testability: Individual stages can be tested in isolation
Bidirectional Pipelines
Sometimes pipelines need to support bidirectional communication, where downstream stages can send feedback or control signals to upstream stages:
package main
import (
"fmt"
"time"
)
// Request represents a work item with a response channel
type Request struct {
Data int
Response chan<- int
}
// Worker processes requests and sends responses back
func worker(requests <-chan Request) {
for req := range requests {
// Simulate processing
time.Sleep(100 * time.Millisecond)
result := req.Data * req.Data
// Send response back through the response channel
req.Response <- result
close(req.Response) // Signal that no more responses will be sent
}
}
func main() {
// Create request channel
requests := make(chan Request)
// Start worker
go worker(requests)
// Send requests and receive responses
for i := 1; i <= 5; i++ {
// Create response channel for this request
respCh := make(chan int)
// Send request with response channel
requests <- Request{
Data: i,
Response: respCh,
}
// Wait for response
resp := <-respCh
fmt.Printf("Request: %d, Response: %d\n", i, resp)
}
close(requests)
}
This pattern enables:
- Request-Response Communication: Each request can receive a dedicated response
- Feedback Loops: Downstream stages can provide feedback to upstream stages
- Dynamic Flow Control: Processing can adapt based on feedback
Multi-Stage Pipeline with Context Cancellation
For long-running pipelines, proper cancellation support is essential. Here’s a pattern that integrates context cancellation across multiple stages:
package main
import (
"context"
"fmt"
"time"
)
// generator produces integers until cancelled
func generator(ctx context.Context) <-chan int {
out := make(chan int)
go func() {
defer close(out)
n := 1
for {
select {
case <-ctx.Done():
fmt.Println("Generator cancelled")
return
case out <- n:
n++
time.Sleep(100 * time.Millisecond)
}
}
}()
return out
}
// processor squares numbers and handles cancellation
func processor(ctx context.Context, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for {
select {
case <-ctx.Done():
fmt.Println("Processor cancelled")
return
case n, ok := <-in:
if !ok {
return
}
select {
case <-ctx.Done():
fmt.Println("Processor cancelled")
return
case out <- n * n:
// Value sent successfully
}
}
}
}()
return out
}
// filter only passes even numbers and handles cancellation
func filter(ctx context.Context, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for {
select {
case <-ctx.Done():
fmt.Println("Filter cancelled")
return
case n, ok := <-in:
if !ok {
return
}
if n%2 == 0 {
select {
case <-ctx.Done():
fmt.Println("Filter cancelled")
return
case out <- n:
// Value sent successfully
}
}
}
}
}()
return out
}
func main() {
// Create a context with cancellation
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
// Build and run the pipeline
gen := generator(ctx)
proc := processor(ctx, gen)
filt := filter(ctx, proc)
// Consume the results
for n := range filt {
fmt.Println("Result:", n)
}
fmt.Println("Pipeline completed")
}
This pattern ensures:
- Graceful Termination: All stages can clean up resources when cancelled
- Propagation of Cancellation: Cancellation signals propagate through the entire pipeline
- Timeout Support: Pipelines can automatically terminate after a specified duration
- Resource Management: Resources are properly released when the pipeline terminates
These advanced pipeline architectures provide powerful tools for building sophisticated data processing systems. In the next section, we’ll explore specific patterns for stream processing.