Event Processing and Projections
In event sourcing, projections are read models built by consuming events and updating query-friendly data structures. They transform the event stream into optimized views for specific query needs.
Projection Fundamentals
Projections have several key characteristics:
- Derived Data: They contain no original data, only derived views of events
- Rebuilable: They can be rebuilt from scratch by replaying events
- Query-Optimized: They are structured for specific query patterns
- Eventually Consistent: They may lag slightly behind the write side
Let’s implement a simple projection system:
package eventsourcing
import (
"context"
"fmt"
"sync"
)
// Projection defines the interface for a read model projection
type Projection interface {
// Name returns the unique name of the projection
Name() string
// HandleEvent processes an event and updates the projection
HandleEvent(ctx context.Context, event Event) error
// Reset clears the projection state
Reset(ctx context.Context) error
}
// ProjectionManager manages a set of projections
type ProjectionManager struct {
eventStore EventStore
projections map[string]Projection
mu sync.RWMutex
}
// NewProjectionManager creates a new projection manager
func NewProjectionManager(eventStore EventStore) *ProjectionManager {
return &ProjectionManager{
eventStore: eventStore,
projections: make(map[string]Projection),
}
}
// RegisterProjection registers a projection with the manager
func (m *ProjectionManager) RegisterProjection(projection Projection) {
m.mu.Lock()
defer m.mu.Unlock()
m.projections[projection.Name()] = projection
}
// HandleEvent processes an event through all registered projections
func (m *ProjectionManager) HandleEvent(ctx context.Context, event Event) error {
m.mu.RLock()
defer m.mu.RUnlock()
for _, projection := range m.projections {
if err := projection.HandleEvent(ctx, event); err != nil {
return fmt.Errorf("projection %s failed to handle event: %w", projection.Name(), err)
}
}
return nil
}
// RebuildProjection rebuilds a specific projection from scratch
func (m *ProjectionManager) RebuildProjection(ctx context.Context, projectionName string) error {
m.mu.RLock()
projection, exists := m.projections[projectionName]
m.mu.RUnlock()
if !exists {
return fmt.Errorf("projection %s not found", projectionName)
}
// Reset the projection
if err := projection.Reset(ctx); err != nil {
return fmt.Errorf("failed to reset projection: %w", err)
}
// Get all events
offset := 0
batchSize := 1000
for {
events, err := m.eventStore.GetAllEvents(ctx, offset, batchSize)
if err != nil {
return fmt.Errorf("failed to get events: %w", err)
}
// Process each event
for _, event := range events {
if err := projection.HandleEvent(ctx, event); err != nil {
return fmt.Errorf("failed to handle event during rebuild: %w", err)
}
}
// If we got fewer events than the batch size, we're done
if len(events) < batchSize {
break
}
// Move to the next batch
offset += batchSize
}
return nil
}
// RebuildAllProjections rebuilds all registered projections
func (m *ProjectionManager) RebuildAllProjections(ctx context.Context) error {
m.mu.RLock()
projectionNames := make([]string, 0, len(m.projections))
for name := range m.projections {
projectionNames = append(projectionNames, name)
}
m.mu.RUnlock()
for _, name := range projectionNames {
if err := m.RebuildProjection(ctx, name); err != nil {
return err
}
}
return nil
}
Product Catalog Projection
Let’s implement a concrete projection for our e-commerce product catalog:
package ecommerce
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"github.com/yourdomain/eventsourcing"
)
// ProductCatalogProjection maintains a read-optimized view of products
type ProductCatalogProjection struct {
db *sql.DB
}
// NewProductCatalogProjection creates a new product catalog projection
func NewProductCatalogProjection(db *sql.DB) *ProductCatalogProjection {
return &ProductCatalogProjection{db: db}
}
// Name returns the unique name of the projection
func (p *ProductCatalogProjection) Name() string {
return "product_catalog"
}
// Initialize creates the necessary tables if they don't exist
func (p *ProductCatalogProjection) Initialize(ctx context.Context) error {
_, err := p.db.ExecContext(ctx, `
CREATE TABLE IF NOT EXISTS product_catalog (
product_id VARCHAR(255) PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description TEXT,
price DECIMAL(10, 2) NOT NULL,
stock_level INT NOT NULL,
is_active BOOLEAN NOT NULL DEFAULT TRUE,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_product_catalog_name ON product_catalog(name);
CREATE INDEX IF NOT EXISTS idx_product_catalog_price ON product_catalog(price);
`)
return err
}
// HandleEvent processes an event and updates the projection
func (p *ProductCatalogProjection) HandleEvent(ctx context.Context, event eventsourcing.Event) error {
switch event.EventType {
case "ProductCreated":
return p.handleProductCreated(ctx, event)
case "ProductPriceChanged":
return p.handleProductPriceChanged(ctx, event)
case "ProductStockAdjusted":
return p.handleProductStockAdjusted(ctx, event)
}
// Ignore events we don't care about
return nil
}
// Reset clears the projection state
func (p *ProductCatalogProjection) Reset(ctx context.Context) error {
_, err := p.db.ExecContext(ctx, "DELETE FROM product_catalog")
return err
}
// handleProductCreated processes a ProductCreated event
func (p *ProductCatalogProjection) handleProductCreated(ctx context.Context, event eventsourcing.Event) error {
var data ProductCreated
if err := json.Unmarshal(event.Data, &data); err != nil {
return err
}
_, err := p.db.ExecContext(ctx, `
INSERT INTO product_catalog (
product_id, name, description, price, stock_level, is_active, created_at, updated_at
) VALUES (
$1, $2, $3, $4, $5, TRUE, $6, $6
)
`, data.ProductID, data.Name, data.Description, data.Price, data.StockLevel, event.Timestamp)
return err
}
// handleProductPriceChanged processes a ProductPriceChanged event
func (p *ProductCatalogProjection) handleProductPriceChanged(ctx context.Context, event eventsourcing.Event) error {
var data ProductPriceChanged
if err := json.Unmarshal(event.Data, &data); err != nil {
return err
}
_, err := p.db.ExecContext(ctx, `
UPDATE product_catalog
SET price = $1, updated_at = $2
WHERE product_id = $3
`, data.NewPrice, event.Timestamp, data.ProductID)
return err
}
// handleProductStockAdjusted processes a ProductStockAdjusted event
func (p *ProductCatalogProjection) handleProductStockAdjusted(ctx context.Context, event eventsourcing.Event) error {
var data ProductStockAdjusted
if err := json.Unmarshal(event.Data, &data); err != nil {
return err
}
_, err := p.db.ExecContext(ctx, `
UPDATE product_catalog
SET stock_level = $1, updated_at = $2
WHERE product_id = $3
`, data.NewStockLevel, event.Timestamp, data.ProductID)
return err
}