Real-Time Data Processing Patterns
Event Processing Patterns
Common patterns for handling event streams:
Windowing Strategies:
- Tumbling windows: Fixed-size, non-overlapping
- Sliding windows: Fixed-size, overlapping
- Session windows: Dynamic size based on activity
- Global windows: All events in single window
- Custom windows: Application-specific logic
Example Windowing Patterns:
Tumbling Windows (5-minute):
|-----|-----|-----|-----|-----|
0 5 10 15 20 25 minutes
Sliding Windows (5-minute, sliding by 1-minute):
|-----|
|-----|
|-----|
|-----|
|-----|
0 5 10 15 20 25 minutes
Session Windows (5-minute gap):
|------| |-------| |---|
0 5 8 15 18 20 minutes
Stateful Processing:
- Local state stores
- Fault-tolerant state management
- Checkpointing and recovery
- State backends (memory, RocksDB)
- State expiration and cleanup
- Queryable state
Example Stateful Processing (Flink):
// Flink stateful processing example
DataStream<Transaction> transactions = // ...
// Define keyed state for user balances
transactions
.keyBy(Transaction::getUserId)
.process(new ProcessFunction<Transaction, Alert>() {
// Declare state for current balance
private ValueState<Double> balanceState;
@Override
public void open(Configuration config) {
ValueStateDescriptor<Double> descriptor =
new ValueStateDescriptor<>("balance", Types.DOUBLE);
balanceState = getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(
Transaction transaction,
Context ctx,
Collector<Alert> out) throws Exception {
// Get current balance or initialize to 0
Double currentBalance = balanceState.value();
if (currentBalance == null) {
currentBalance = 0.0;
}
// Update balance based on transaction
double newBalance = currentBalance + transaction.getAmount();
balanceState.update(newBalance);
// Check for negative balance
if (newBalance < 0) {
out.collect(new Alert(
transaction.getUserId(),
"Negative balance detected",
newBalance
));
}
}
});
Event-Time Processing:
- Handling out-of-order events
- Watermark generation
- Late event handling
- Side outputs for late data
- Time characteristics configuration
- Event-time windows
Data Integration Patterns
Connecting real-time data sources and sinks:
Change Data Capture (CDC):
- Real-time database change monitoring
- Log-based CDC
- Query-based CDC
- Trigger-based CDC
- Schema evolution handling
- Consistent snapshots
- Incremental updates
Example Debezium Configuration (MySQL CDC):
{
"name": "mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "1",
"database.server.name": "mysql-server",
"database.include.list": "inventory",
"table.include.list": "inventory.customers,inventory.orders",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory"
}
}
Event Sourcing:
- Events as the system of record
- Append-only event log
- State reconstruction from events
- Event store implementation
- Snapshotting for performance
- CQRS integration
- Versioning and schema evolution
Stream-Table Duality:
- Streams as tables
- Tables as streams
- Materialized views
- Changelog streams
- Compacted topics
- State stores
- Stream-table joins
Example Stream-Table Join (Kafka Streams):
// Kafka Streams stream-table join example
StreamsBuilder builder = new StreamsBuilder();
// Create a KTable from a compacted topic
KTable<String, Customer> customers = builder.table(
"customers",
Consumed.with(Serdes.String(), customerSerde),
Materialized.as("customers-store")
);
// Create a KStream from an event topic
KStream<String, Order> orders = builder.stream(
"orders",
Consumed.with(Serdes.String(), orderSerde)
);
// Join the stream of orders with the customer table
KStream<String, EnrichedOrder> enrichedOrders = orders.join(
customers,
(orderId, order) -> order.getCustomerId(), // Foreign key for join
(order, customer) -> new EnrichedOrder(order, customer)
);
// Process the enriched orders
enrichedOrders.to(
"enriched-orders",
Produced.with(Serdes.String(), enrichedOrderSerde)
);