Stream Processing Patterns

Stream processing involves continuously processing data as it arrives, rather than processing data in batches. Go’s concurrency model is particularly well-suited for implementing stream processing patterns.

Linear Stream Processing

The simplest form of stream processing is a linear pipeline where data flows through a sequence of transformations:

package main

import (
	"fmt"
	"strings"
	"time"
)

// LogEntry represents a log entry in a stream
type LogEntry struct {
	Timestamp time.Time
	Level     string
	Message   string
}

// Source generates a stream of log entries
func source() <-chan LogEntry {
	out := make(chan LogEntry)
	go func() {
		defer close(out)
		levels := []string{"INFO", "WARNING", "ERROR", "DEBUG"}
		messages := []string{
			"User logged in",
			"Failed to connect to database",
			"Processing request",
			"Cache miss",
			"Request completed",
		}
		
		for i := 0; i < 20; i++ {
			entry := LogEntry{
				Timestamp: time.Now().Add(-time.Duration(i) * time.Minute),
				Level:     levels[i%len(levels)],
				Message:   messages[i%len(messages)],
			}
			out <- entry
			time.Sleep(100 * time.Millisecond)
		}
	}()
	return out
}

// Filter keeps only entries with specified log levels
func filter(in <-chan LogEntry, allowedLevels ...string) <-chan LogEntry {
	out := make(chan LogEntry)
	go func() {
		defer close(out)
		levelSet := make(map[string]bool)
		for _, level := range allowedLevels {
			levelSet[level] = true
		}
		
		for entry := range in {
			if levelSet[entry.Level] {
				out <- entry
			}
		}
	}()
	return out
}

// Enrich adds additional information to log entries
func enrich(in <-chan LogEntry) <-chan LogEntry {
	out := make(chan LogEntry)
	go func() {
		defer close(out)
		for entry := range in {
			// Add additional context to error messages
			if entry.Level == "ERROR" {
				entry.Message = fmt.Sprintf("%s (Contact: [email protected])", entry.Message)
			}
			out <- entry
		}
	}()
	return out
}

// Format converts log entries to formatted strings
func format(in <-chan LogEntry) <-chan string {
	out := make(chan string)
	go func() {
		defer close(out)
		for entry := range in {
			formatted := fmt.Sprintf(
				"[%s] %s: %s",
				entry.Timestamp.Format("15:04:05"),
				entry.Level,
				entry.Message,
			)
			out <- formatted
		}
	}()
	return out
}

// Sink consumes the final stream and performs an action
func sink(in <-chan string) {
	for s := range in {
		fmt.Println(s)
	}
}

func main() {
	// Build the stream processing pipeline
	logs := source()
	filtered := filter(logs, "ERROR", "WARNING")
	enriched := enrich(filtered)
	formatted := format(enriched)
	
	// Consume the stream
	sink(formatted)
}

This pattern is effective for straightforward transformations where each item is processed independently.

Windowed Stream Processing

Many stream processing applications need to analyze data within time or count-based windows:

package main

import (
	"fmt"
	"time"
)

// Event represents a data point in a stream
type Event struct {
	Timestamp time.Time
	Value     float64
}

// Window represents a collection of events within a time window
type Window struct {
	StartTime time.Time
	EndTime   time.Time
	Events    []Event
}

// source generates a stream of events
func source() <-chan Event {
	out := make(chan Event)
	go func() {
		defer close(out)
		now := time.Now()
		for i := 0; i < 100; i++ {
			event := Event{
				Timestamp: now.Add(time.Duration(i*100) * time.Millisecond),
				Value:     float64(i % 10),
			}
			out <- event
			time.Sleep(50 * time.Millisecond)
		}
	}()
	return out
}

// tumblingWindow groups events into non-overlapping time windows of fixed duration
func tumblingWindow(in <-chan Event, windowDuration time.Duration) <-chan Window {
	out := make(chan Window)
	go func() {
		defer close(out)
		
		var currentWindow Window
		var windowStarted bool
		
		for event := range in {
			// If this is the first event or the event belongs to a new window
			if !windowStarted || event.Timestamp.After(currentWindow.EndTime) {
				// If we have a window in progress, emit it
				if windowStarted {
					out <- currentWindow
				}
				
				// Start a new window
				windowStart := event.Timestamp
				windowEnd := windowStart.Add(windowDuration)
				currentWindow = Window{
					StartTime: windowStart,
					EndTime:   windowEnd,
					Events:    []Event{event},
				}
				windowStarted = true
			} else {
				// Add event to current window
				currentWindow.Events = append(currentWindow.Events, event)
			}
		}
		
		// Emit the last window if it has events
		if windowStarted && len(currentWindow.Events) > 0 {
			out <- currentWindow
		}
	}()
	return out
}

// slidingWindow groups events into overlapping time windows
func slidingWindow(in <-chan Event, windowDuration, slideInterval time.Duration) <-chan Window {
	out := make(chan Window)
	go func() {
		defer close(out)
		
		// Buffer to hold events for windowing
		var buffer []Event
		
		// Track the oldest timestamp we need to keep
		var oldestNeeded time.Time
		
		for event := range in {
			// Add new event to buffer
			buffer = append(buffer, event)
			
			// Update oldest timestamp needed
			if oldestNeeded.IsZero() {
				oldestNeeded = event.Timestamp
			}
			
			// Check if we should emit a window
			windowEnd := event.Timestamp
			windowStart := windowEnd.Add(-windowDuration)
			
			// Emit window if slide interval has passed since last emission
			if oldestNeeded.IsZero() || event.Timestamp.Sub(oldestNeeded) >= slideInterval {
				// Create window with events in the time range
				window := Window{
					StartTime: windowStart,
					EndTime:   windowEnd,
					Events:    []Event{},
				}
				
				for _, e := range buffer {
					if !e.Timestamp.Before(windowStart) && !e.Timestamp.After(windowEnd) {
						window.Events = append(window.Events, e)
					}
				}
				
				// Emit window if it has events
				if len(window.Events) > 0 {
					out <- window
				}
				
				// Update oldest needed timestamp
				oldestNeeded = event.Timestamp
				
				// Remove events that are no longer needed
				newBuffer := []Event{}
				for _, e := range buffer {
					if !e.Timestamp.Before(windowStart) {
						newBuffer = append(newBuffer, e)
					}
				}
				buffer = newBuffer
			}
		}
	}()
	return out
}

// aggregate computes statistics for each window
func aggregate(in <-chan Window) <-chan string {
	out := make(chan string)
	go func() {
		defer close(out)
		for window := range in {
			// Calculate average
			sum := 0.0
			for _, event := range window.Events {
				sum += event.Value
			}
			avg := sum / float64(len(window.Events))
			
			result := fmt.Sprintf(
				"Window [%s to %s]: %d events, avg=%.2f",
				window.StartTime.Format("15:04:05.000"),
				window.EndTime.Format("15:04:05.000"),
				len(window.Events),
				avg,
			)
			out <- result
		}
	}()
	return out
}

func main() {
	events := source()
	
	// Create two different windowing strategies
	tumbling := tumblingWindow(events, 1*time.Second)
	tumblingResults := aggregate(tumbling)
	
	// Consume and print results
	for result := range tumblingResults {
		fmt.Println(result)
	}
}

Windowed processing is essential for:

  1. Time-Based Analysis: Computing statistics over time periods
  2. Trend Detection: Identifying patterns in streaming data
  3. Anomaly Detection: Detecting unusual behavior in real-time

Stateful Stream Processing

Some stream processing applications need to maintain state across multiple events:

package main

import (
	"fmt"
	"time"
)

// Transaction represents a financial transaction
type Transaction struct {
	ID        string
	UserID    string
	Amount    float64
	Timestamp time.Time
}

// Alert represents a fraud detection alert
type Alert struct {
	UserID      string
	Reason      string
	Transactions []Transaction
	Timestamp   time.Time
}

// source generates a stream of transactions
func source() <-chan Transaction {
	out := make(chan Transaction)
	go func() {
		defer close(out)
		users := []string{"user1", "user2", "user3", "user4"}
		now := time.Now()
		
		// Generate normal transactions
		for i := 0; i < 20; i++ {
			userID := users[i%len(users)]
			amount := 10.0 + float64(i%5)*20.0
			
			// For user3, generate suspicious transactions
			if userID == "user3" && i > 10 {
				amount = 500.0 + float64(i%3)*200.0
			}
			
			tx := Transaction{
				ID:        fmt.Sprintf("tx-%d", i),
				UserID:    userID,
				Amount:    amount,
				Timestamp: now.Add(time.Duration(i*30) * time.Second),
			}
			out <- tx
			time.Sleep(100 * time.Millisecond)
		}
	}()
	return out
}

// fraudDetector maintains state to detect suspicious patterns
func fraudDetector(in <-chan Transaction) <-chan Alert {
	out := make(chan Alert)
	go func() {
		defer close(out)
		
		// State: track recent transactions by user
		userTransactions := make(map[string][]Transaction)
		
		// State: track total amount in last hour by user
		userAmounts := make(map[string]float64)
		
		for tx := range in {
			// Add transaction to user history
			userTransactions[tx.UserID] = append(userTransactions[tx.UserID], tx)
			
			// Update total amount (simplified - not actually checking time window)
			userAmounts[tx.UserID] += tx.Amount
			
			// Check for suspicious activity
			if userAmounts[tx.UserID] > 1000.0 {
				// Create alert
				alert := Alert{
					UserID:      tx.UserID,
					Reason:      "High transaction volume",
					Timestamp:   time.Now(),
				}
				out <- alert
				
				// Reset state for this user
				userAmounts[tx.UserID] = 0
			}
		}
	}()
	return out
}

// alertHandler processes alerts
func alertHandler(in <-chan Alert) {
	for alert := range in {
		fmt.Printf("ALERT for %s: %s\n", alert.UserID, alert.Reason)
		fmt.Printf("  Total transactions: %d\n", len(alert.Transactions))
		fmt.Printf("  Latest transaction: %s for $%.2f\n",
			alert.Transactions[len(alert.Transactions)-1].ID,
			alert.Transactions[len(alert.Transactions)-1].Amount)
	}
}

func main() {
	// Build the stateful stream processing pipeline
	transactions := source()
	alerts := fraudDetector(transactions)
	
	// Process alerts
	alertHandler(alerts)
}

This pattern enables:

  1. State Maintenance: Keeping track of information across multiple events
  2. Pattern Recognition: Identifying complex patterns that span multiple events
  3. Aggregation: Computing running statistics over a stream of events

These stream processing patterns provide powerful tools for building real-time data processing systems. In the next section, we’ll explore error handling and recovery strategies for pipelines.