Reproducibility

Ensuring consistent model behavior:

Reproducibility Challenges:

  • Non-deterministic algorithms
  • Changing data sources
  • Environment dependencies
  • Random initializations
  • Hardware variations
  • Library version changes

Reproducibility Best Practices:

  • Set and log random seeds
  • Version control all code
  • Version and hash datasets
  • Use containerized environments
  • Lock dependency versions
  • Document hardware requirements

Example Reproducible Training Script:

# Reproducible training script
import numpy as np
import tensorflow as tf
import random
import os

# Set seeds for reproducibility
def set_seeds(seed=42):
    os.environ['PYTHONHASHSEED'] = str(seed)
    random.seed(seed)
    np.random.seed(seed)
    tf.random.set_seed(seed)
    
    # For TensorFlow 2.x
    os.environ['TF_DETERMINISTIC_OPS'] = '1'
    os.environ['TF_CUDNN_DETERMINISTIC'] = '1'
    
    print(f"Random seed set to {seed}")
    return seed

# Main training function
def train_model(config):
    # Set seeds
    seed = set_seeds(config.get("seed", 42))
    
    # Load data with version hash check
    data = load_data(config["data_path"])
    
    # Prepare data
    X_train, X_test, y_train, y_test = prepare_data(data, config["test_size"], seed)
    
    # Build model
    model = build_model(config["model_params"])
    
    # Train model
    history = model.fit(
        X_train, y_train,
        validation_data=(X_test, y_test),
        epochs=config["epochs"],
        batch_size=config["batch_size"]
    )
    
    # Evaluate model
    results = model.evaluate(X_test, y_test)
    
    # Save model and configuration
    save_artifacts(model, config, history, results)
    
    return model, history, results

Feature Engineering and Feature Stores

Managing features for ML models:

Feature Engineering Best Practices:

  • Create reusable transformation pipelines
  • Implement feature validation
  • Document feature definitions
  • Test feature stability over time
  • Handle missing values consistently
  • Address feature drift

Feature Store Components:

  • Feature registry and catalog
  • Offline feature storage
  • Online feature serving
  • Feature versioning
  • Transformation pipelines
  • Monitoring and validation

Example Feature Store Usage:

# Feature store example with Feast
from feast import FeatureStore

# Initialize the feature store
store = FeatureStore(repo_path="./feature_repo")

# Get training data for a model
training_df = store.get_historical_features(
    entity_df=entity_df,
    features=[
        "customer_features:age",
        "customer_features:total_purchases",
        "transaction_features:purchase_amount_7d_avg"
    ],
).to_df()

# Train the model
model = train_model(training_df)

# Get online features for prediction
features = store.get_online_features(
    features=[
        "customer_features:age",
        "customer_features:total_purchases",
        "transaction_features:purchase_amount_7d_avg"
    ],
    entity_rows=[{"customer_id": "1234"}]
).to_dict()

# Make prediction
prediction = model.predict(features)

Feature Store Benefits:

  • Consistent features across training and serving
  • Reduced feature duplication
  • Improved feature discovery and reuse
  • Point-in-time correctness
  • Efficient online serving
  • Feature lineage tracking

Model Deployment and Serving

Model Packaging

Preparing models for deployment:

Model Packaging Options:

  • Docker containers
  • Python packages
  • Serialized model files
  • ONNX format
  • TensorFlow SavedModel
  • PyTorch TorchScript

Example Model Packaging with Docker:

# Dockerfile for model serving
FROM python:3.9-slim

WORKDIR /app

# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy model artifacts and code
COPY model/ ./model/
COPY src/ ./src/

# Set environment variables
ENV MODEL_PATH=/app/model/model.pkl
ENV MODEL_VERSION=1.0.0

# Expose port for API
EXPOSE 8000

# Run the API server
CMD ["uvicorn", "src.api:app", "--host", "0.0.0.0", "--port", "8000"]

Model Packaging Best Practices:

  • Include all dependencies
  • Version models explicitly
  • Document input/output specifications
  • Include preprocessing code
  • Optimize for inference
  • Test packaged models

Deployment Patterns

Strategies for deploying ML models:

Common Deployment Patterns:

  • REST API endpoints
  • Batch prediction jobs
  • Real-time streaming
  • Edge deployment
  • Embedded models
  • Serverless functions

Example FastAPI Model Serving:

# FastAPI model serving
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import joblib
import numpy as np
import time
from typing import Dict, Optional

# Initialize FastAPI app
app = FastAPI(title="Churn Prediction Model API")

# Load model at startup
model = None

@app.on_event("startup")
async def load_model():
    global model
    model = joblib.load("./model/churn_model.pkl")

# Define request and response models
class PredictionRequest(BaseModel):
    features: Dict[str, float]
    request_id: Optional[str] = None

class PredictionResponse(BaseModel):
    prediction: float
    probability: float
    prediction_label: str
    model_version: str
    request_id: Optional[str] = None
    processing_time_ms: float

# Prediction endpoint
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
    start_time = time.time()
    
    if model is None:
        raise HTTPException(status_code=503, detail="Model not loaded")
    
    try:
        # Extract features
        feature_names = ['age', 'tenure', 'monthly_charges', 'total_charges']
        features = np.array([request.features.get(name, 0) for name in feature_names]).reshape(1, -1)
        
        # Make prediction
        probability = model.predict_proba(features)[0, 1]
        prediction = int(probability >= 0.5)
        prediction_label = "Churn" if prediction == 1 else "No Churn"
        
        # Calculate processing time
        processing_time = (time.time() - start_time) * 1000
        
        # Return response
        return PredictionResponse(
            prediction=float(prediction),
            probability=float(probability),
            prediction_label=prediction_label,
            model_version="1.0.0",
            request_id=request.request_id,
            processing_time_ms=processing_time
        )
        
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Prediction error: {str(e)}")

Deployment Considerations:

  • Latency requirements
  • Throughput needs
  • Resource constraints
  • Scaling patterns
  • Batch vs. real-time
  • Edge vs. cloud