Real-Time Data Processing Patterns

Event Processing Patterns

Common patterns for handling event streams:

Windowing Strategies:

  • Tumbling windows: Fixed-size, non-overlapping
  • Sliding windows: Fixed-size, overlapping
  • Session windows: Dynamic size based on activity
  • Global windows: All events in single window
  • Custom windows: Application-specific logic

Example Windowing Patterns:

Tumbling Windows (5-minute):
|-----|-----|-----|-----|-----|
0     5     10    15    20    25 minutes

Sliding Windows (5-minute, sliding by 1-minute):
|-----|
 |-----|
  |-----|
   |-----|
    |-----|
0     5     10    15    20    25 minutes

Session Windows (5-minute gap):
|------|  |-------|  |---|
0      5  8       15 18  20 minutes

Stateful Processing:

  • Local state stores
  • Fault-tolerant state management
  • Checkpointing and recovery
  • State backends (memory, RocksDB)
  • State expiration and cleanup
  • Queryable state

Example Stateful Processing (Flink):

// Flink stateful processing example
DataStream<Transaction> transactions = // ...

// Define keyed state for user balances
transactions
    .keyBy(Transaction::getUserId)
    .process(new ProcessFunction<Transaction, Alert>() {
        // Declare state for current balance
        private ValueState<Double> balanceState;
        
        @Override
        public void open(Configuration config) {
            ValueStateDescriptor<Double> descriptor = 
                new ValueStateDescriptor<>("balance", Types.DOUBLE);
            balanceState = getRuntimeContext().getState(descriptor);
        }
        
        @Override
        public void processElement(
            Transaction transaction,
            Context ctx,
            Collector<Alert> out) throws Exception {
            
            // Get current balance or initialize to 0
            Double currentBalance = balanceState.value();
            if (currentBalance == null) {
                currentBalance = 0.0;
            }
            
            // Update balance based on transaction
            double newBalance = currentBalance + transaction.getAmount();
            balanceState.update(newBalance);
            
            // Check for negative balance
            if (newBalance < 0) {
                out.collect(new Alert(
                    transaction.getUserId(),
                    "Negative balance detected",
                    newBalance
                ));
            }
        }
    });

Event-Time Processing:

  • Handling out-of-order events
  • Watermark generation
  • Late event handling
  • Side outputs for late data
  • Time characteristics configuration
  • Event-time windows

Data Integration Patterns

Connecting real-time data sources and sinks:

Change Data Capture (CDC):

  • Real-time database change monitoring
  • Log-based CDC
  • Query-based CDC
  • Trigger-based CDC
  • Schema evolution handling
  • Consistent snapshots
  • Incremental updates

Example Debezium Configuration (MySQL CDC):

{
  "name": "mysql-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "1",
    "database.server.name": "mysql-server",
    "database.include.list": "inventory",
    "table.include.list": "inventory.customers,inventory.orders",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "schema-changes.inventory"
  }
}

Event Sourcing:

  • Events as the system of record
  • Append-only event log
  • State reconstruction from events
  • Event store implementation
  • Snapshotting for performance
  • CQRS integration
  • Versioning and schema evolution

Stream-Table Duality:

  • Streams as tables
  • Tables as streams
  • Materialized views
  • Changelog streams
  • Compacted topics
  • State stores
  • Stream-table joins

Example Stream-Table Join (Kafka Streams):

// Kafka Streams stream-table join example
StreamsBuilder builder = new StreamsBuilder();

// Create a KTable from a compacted topic
KTable<String, Customer> customers = builder.table(
    "customers",
    Consumed.with(Serdes.String(), customerSerde),
    Materialized.as("customers-store")
);

// Create a KStream from an event topic
KStream<String, Order> orders = builder.stream(
    "orders",
    Consumed.with(Serdes.String(), orderSerde)
);

// Join the stream of orders with the customer table
KStream<String, EnrichedOrder> enrichedOrders = orders.join(
    customers,
    (orderId, order) -> order.getCustomerId(), // Foreign key for join
    (order, customer) -> new EnrichedOrder(order, customer)
);

// Process the enriched orders
enrichedOrders.to(
    "enriched-orders",
    Produced.with(Serdes.String(), enrichedOrderSerde)
);