Stream Processing Patterns
Stream processing involves continuously processing data as it arrives, rather than processing data in batches. Go’s concurrency model is particularly well-suited for implementing stream processing patterns.
Linear Stream Processing
The simplest form of stream processing is a linear pipeline where data flows through a sequence of transformations:
package main
import (
"fmt"
"strings"
"time"
)
// LogEntry represents a log entry in a stream
type LogEntry struct {
Timestamp time.Time
Level string
Message string
}
// Source generates a stream of log entries
func source() <-chan LogEntry {
out := make(chan LogEntry)
go func() {
defer close(out)
levels := []string{"INFO", "WARNING", "ERROR", "DEBUG"}
messages := []string{
"User logged in",
"Failed to connect to database",
"Processing request",
"Cache miss",
"Request completed",
}
for i := 0; i < 20; i++ {
entry := LogEntry{
Timestamp: time.Now().Add(-time.Duration(i) * time.Minute),
Level: levels[i%len(levels)],
Message: messages[i%len(messages)],
}
out <- entry
time.Sleep(100 * time.Millisecond)
}
}()
return out
}
// Filter keeps only entries with specified log levels
func filter(in <-chan LogEntry, allowedLevels ...string) <-chan LogEntry {
out := make(chan LogEntry)
go func() {
defer close(out)
levelSet := make(map[string]bool)
for _, level := range allowedLevels {
levelSet[level] = true
}
for entry := range in {
if levelSet[entry.Level] {
out <- entry
}
}
}()
return out
}
// Enrich adds additional information to log entries
func enrich(in <-chan LogEntry) <-chan LogEntry {
out := make(chan LogEntry)
go func() {
defer close(out)
for entry := range in {
// Add additional context to error messages
if entry.Level == "ERROR" {
entry.Message = fmt.Sprintf("%s (Contact: [email protected])", entry.Message)
}
out <- entry
}
}()
return out
}
// Format converts log entries to formatted strings
func format(in <-chan LogEntry) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for entry := range in {
formatted := fmt.Sprintf(
"[%s] %s: %s",
entry.Timestamp.Format("15:04:05"),
entry.Level,
entry.Message,
)
out <- formatted
}
}()
return out
}
// Sink consumes the final stream and performs an action
func sink(in <-chan string) {
for s := range in {
fmt.Println(s)
}
}
func main() {
// Build the stream processing pipeline
logs := source()
filtered := filter(logs, "ERROR", "WARNING")
enriched := enrich(filtered)
formatted := format(enriched)
// Consume the stream
sink(formatted)
}
This pattern is effective for straightforward transformations where each item is processed independently.
Windowed Stream Processing
Many stream processing applications need to analyze data within time or count-based windows:
package main
import (
"fmt"
"time"
)
// Event represents a data point in a stream
type Event struct {
Timestamp time.Time
Value float64
}
// Window represents a collection of events within a time window
type Window struct {
StartTime time.Time
EndTime time.Time
Events []Event
}
// source generates a stream of events
func source() <-chan Event {
out := make(chan Event)
go func() {
defer close(out)
now := time.Now()
for i := 0; i < 100; i++ {
event := Event{
Timestamp: now.Add(time.Duration(i*100) * time.Millisecond),
Value: float64(i % 10),
}
out <- event
time.Sleep(50 * time.Millisecond)
}
}()
return out
}
// tumblingWindow groups events into non-overlapping time windows of fixed duration
func tumblingWindow(in <-chan Event, windowDuration time.Duration) <-chan Window {
out := make(chan Window)
go func() {
defer close(out)
var currentWindow Window
var windowStarted bool
for event := range in {
// If this is the first event or the event belongs to a new window
if !windowStarted || event.Timestamp.After(currentWindow.EndTime) {
// If we have a window in progress, emit it
if windowStarted {
out <- currentWindow
}
// Start a new window
windowStart := event.Timestamp
windowEnd := windowStart.Add(windowDuration)
currentWindow = Window{
StartTime: windowStart,
EndTime: windowEnd,
Events: []Event{event},
}
windowStarted = true
} else {
// Add event to current window
currentWindow.Events = append(currentWindow.Events, event)
}
}
// Emit the last window if it has events
if windowStarted && len(currentWindow.Events) > 0 {
out <- currentWindow
}
}()
return out
}
// slidingWindow groups events into overlapping time windows
func slidingWindow(in <-chan Event, windowDuration, slideInterval time.Duration) <-chan Window {
out := make(chan Window)
go func() {
defer close(out)
// Buffer to hold events for windowing
var buffer []Event
// Track the oldest timestamp we need to keep
var oldestNeeded time.Time
for event := range in {
// Add new event to buffer
buffer = append(buffer, event)
// Update oldest timestamp needed
if oldestNeeded.IsZero() {
oldestNeeded = event.Timestamp
}
// Check if we should emit a window
windowEnd := event.Timestamp
windowStart := windowEnd.Add(-windowDuration)
// Emit window if slide interval has passed since last emission
if oldestNeeded.IsZero() || event.Timestamp.Sub(oldestNeeded) >= slideInterval {
// Create window with events in the time range
window := Window{
StartTime: windowStart,
EndTime: windowEnd,
Events: []Event{},
}
for _, e := range buffer {
if !e.Timestamp.Before(windowStart) && !e.Timestamp.After(windowEnd) {
window.Events = append(window.Events, e)
}
}
// Emit window if it has events
if len(window.Events) > 0 {
out <- window
}
// Update oldest needed timestamp
oldestNeeded = event.Timestamp
// Remove events that are no longer needed
newBuffer := []Event{}
for _, e := range buffer {
if !e.Timestamp.Before(windowStart) {
newBuffer = append(newBuffer, e)
}
}
buffer = newBuffer
}
}
}()
return out
}
// aggregate computes statistics for each window
func aggregate(in <-chan Window) <-chan string {
out := make(chan string)
go func() {
defer close(out)
for window := range in {
// Calculate average
sum := 0.0
for _, event := range window.Events {
sum += event.Value
}
avg := sum / float64(len(window.Events))
result := fmt.Sprintf(
"Window [%s to %s]: %d events, avg=%.2f",
window.StartTime.Format("15:04:05.000"),
window.EndTime.Format("15:04:05.000"),
len(window.Events),
avg,
)
out <- result
}
}()
return out
}
func main() {
events := source()
// Create two different windowing strategies
tumbling := tumblingWindow(events, 1*time.Second)
tumblingResults := aggregate(tumbling)
// Consume and print results
for result := range tumblingResults {
fmt.Println(result)
}
}
Windowed processing is essential for:
- Time-Based Analysis: Computing statistics over time periods
- Trend Detection: Identifying patterns in streaming data
- Anomaly Detection: Detecting unusual behavior in real-time
Stateful Stream Processing
Some stream processing applications need to maintain state across multiple events:
package main
import (
"fmt"
"time"
)
// Transaction represents a financial transaction
type Transaction struct {
ID string
UserID string
Amount float64
Timestamp time.Time
}
// Alert represents a fraud detection alert
type Alert struct {
UserID string
Reason string
Transactions []Transaction
Timestamp time.Time
}
// source generates a stream of transactions
func source() <-chan Transaction {
out := make(chan Transaction)
go func() {
defer close(out)
users := []string{"user1", "user2", "user3", "user4"}
now := time.Now()
// Generate normal transactions
for i := 0; i < 20; i++ {
userID := users[i%len(users)]
amount := 10.0 + float64(i%5)*20.0
// For user3, generate suspicious transactions
if userID == "user3" && i > 10 {
amount = 500.0 + float64(i%3)*200.0
}
tx := Transaction{
ID: fmt.Sprintf("tx-%d", i),
UserID: userID,
Amount: amount,
Timestamp: now.Add(time.Duration(i*30) * time.Second),
}
out <- tx
time.Sleep(100 * time.Millisecond)
}
}()
return out
}
// fraudDetector maintains state to detect suspicious patterns
func fraudDetector(in <-chan Transaction) <-chan Alert {
out := make(chan Alert)
go func() {
defer close(out)
// State: track recent transactions by user
userTransactions := make(map[string][]Transaction)
// State: track total amount in last hour by user
userAmounts := make(map[string]float64)
for tx := range in {
// Add transaction to user history
userTransactions[tx.UserID] = append(userTransactions[tx.UserID], tx)
// Update total amount (simplified - not actually checking time window)
userAmounts[tx.UserID] += tx.Amount
// Check for suspicious activity
if userAmounts[tx.UserID] > 1000.0 {
// Create alert
alert := Alert{
UserID: tx.UserID,
Reason: "High transaction volume",
Timestamp: time.Now(),
}
out <- alert
// Reset state for this user
userAmounts[tx.UserID] = 0
}
}
}()
return out
}
// alertHandler processes alerts
func alertHandler(in <-chan Alert) {
for alert := range in {
fmt.Printf("ALERT for %s: %s\n", alert.UserID, alert.Reason)
fmt.Printf(" Total transactions: %d\n", len(alert.Transactions))
fmt.Printf(" Latest transaction: %s for $%.2f\n",
alert.Transactions[len(alert.Transactions)-1].ID,
alert.Transactions[len(alert.Transactions)-1].Amount)
}
}
func main() {
// Build the stateful stream processing pipeline
transactions := source()
alerts := fraudDetector(transactions)
// Process alerts
alertHandler(alerts)
}
This pattern enables:
- State Maintenance: Keeping track of information across multiple events
- Pattern Recognition: Identifying complex patterns that span multiple events
- Aggregation: Computing running statistics over a stream of events
These stream processing patterns provide powerful tools for building real-time data processing systems. In the next section, we’ll explore error handling and recovery strategies for pipelines.