Database Integration and SQL for Data Science

When your datasets grow beyond what fits comfortably in memory, databases become essential. But databases aren’t just for storage—they’re powerful analytical engines that can perform complex operations faster than loading everything into pandas. Learning to leverage database capabilities transforms how you approach large-scale data analysis.

The key insight is knowing when to process data in the database versus when to pull it into Python. Database engines excel at filtering, aggregating, and joining large datasets, while Python excels at complex transformations and machine learning.

Database Connections and Configuration

Establishing reliable database connections is the foundation of database-driven analysis. Different databases require different connection approaches, but the patterns are similar across systems.

import pandas as pd
import sqlite3
import sqlalchemy as sa
from sqlalchemy import create_engine, text
import numpy as np
from contextlib import contextmanager

# SQLite for local development and examples
def create_sample_database():
    """Create sample database with realistic data."""
    
    # Create in-memory SQLite database
    engine = create_engine('sqlite:///sample_data.db')
    
    # Generate sample sales data
    np.random.seed(42)
    n_customers = 1000
    n_products = 50
    n_orders = 5000
    
    # Customers table
    customers = pd.DataFrame({
        'customer_id': range(1, n_customers + 1),
        'name': [f'Customer_{i}' for i in range(1, n_customers + 1)],
        'email': [f'customer_{i}@email.com' for i in range(1, n_customers + 1)],
        'city': np.random.choice(['New York', 'London', 'Tokyo', 'Paris'], n_customers),
        'signup_date': pd.date_range('2020-01-01', periods=n_customers, freq='D')[:n_customers]
    })
    
    # Products table
    products = pd.DataFrame({
        'product_id': range(1, n_products + 1),
        'product_name': [f'Product_{i}' for i in range(1, n_products + 1)],
        'category': np.random.choice(['Electronics', 'Clothing', 'Books'], n_products),
        'price': np.random.uniform(10, 500, n_products).round(2)
    })
    
    # Orders table
    orders = pd.DataFrame({
        'order_id': range(1, n_orders + 1),
        'customer_id': np.random.randint(1, n_customers + 1, n_orders),
        'product_id': np.random.randint(1, n_products + 1, n_orders),
        'quantity': np.random.randint(1, 5, n_orders),
        'order_date': pd.date_range('2020-01-01', '2023-12-31', periods=n_orders)
    })
    
    # Write to database
    customers.to_sql('customers', engine, if_exists='replace', index=False)
    products.to_sql('products', engine, if_exists='replace', index=False)
    orders.to_sql('orders', engine, if_exists='replace', index=False)
    
    return engine

# Create sample database
engine = create_sample_database()
print("Sample database created with tables: customers, products, orders")

# Test connection
with engine.connect() as conn:
    result = conn.execute(text("SELECT COUNT(*) as count FROM customers"))
    customer_count = result.fetchone()[0]
    print(f"Database contains {customer_count} customers")

SQL Queries for Data Analysis

SQL excels at filtering, aggregating, and joining large datasets. Learning to write efficient analytical queries reduces the amount of data you need to transfer to Python and can dramatically improve performance.

# Analytical SQL queries
analytical_queries = {
    'customer_summary': """
        SELECT 
            c.city,
            COUNT(DISTINCT c.customer_id) as customer_count,
            COUNT(o.order_id) as total_orders,
            SUM(o.quantity * p.price) as total_revenue,
            AVG(o.quantity * p.price) as avg_order_value
        FROM customers c
        LEFT JOIN orders o ON c.customer_id = o.customer_id
        LEFT JOIN products p ON o.product_id = p.product_id
        GROUP BY c.city
        ORDER BY total_revenue DESC
    """,
    
    'monthly_trends': """
        SELECT 
            strftime('%Y-%m', o.order_date) as month,
            p.category,
            SUM(o.quantity * p.price) as revenue,
            COUNT(o.order_id) as order_count,
            AVG(o.quantity * p.price) as avg_order_value
        FROM orders o
        JOIN products p ON o.product_id = p.product_id
        WHERE o.order_date >= '2023-01-01'
        GROUP BY month, p.category
        ORDER BY month, revenue DESC
    """,
    
    'customer_cohorts': """
        SELECT 
            strftime('%Y-%m', c.signup_date) as cohort_month,
            COUNT(DISTINCT c.customer_id) as cohort_size,
            COUNT(DISTINCT CASE WHEN o.order_date IS NOT NULL THEN c.customer_id END) as active_customers,
            ROUND(
                COUNT(DISTINCT CASE WHEN o.order_date IS NOT NULL THEN c.customer_id END) * 100.0 / 
                COUNT(DISTINCT c.customer_id), 2
            ) as activation_rate
        FROM customers c
        LEFT JOIN orders o ON c.customer_id = o.customer_id 
            AND o.order_date BETWEEN c.signup_date AND date(c.signup_date, '+30 days')
        GROUP BY cohort_month
        ORDER BY cohort_month
    """
}

# Execute analytical queries
results = {}
for query_name, query in analytical_queries.items():
    df = pd.read_sql_query(query, engine)
    results[query_name] = df
    
    print(f"\n{query_name.replace('_', ' ').title()}:")
    print(df.head())
    print(f"Shape: {df.shape}")

Pandas and SQL Integration

Pandas integrates seamlessly with SQL databases, allowing you to combine the power of SQL for data retrieval with pandas for complex transformations and analysis.

# Advanced pandas-SQL integration
class DatabaseAnalyzer:
    """Class for database-driven analysis with pandas integration."""
    
    def __init__(self, engine):
        self.engine = engine
    
    def query_to_dataframe(self, query: str, params: dict = None) -> pd.DataFrame:
        """Execute SQL query and return pandas DataFrame."""
        return pd.read_sql_query(query, self.engine, params=params)
    
    def chunked_query(self, query: str, chunksize: int = 10000) -> pd.DataFrame:
        """Process large queries in chunks to manage memory."""
        chunks = []
        for chunk in pd.read_sql_query(query, self.engine, chunksize=chunksize):
            # Process each chunk (e.g., apply transformations)
            processed_chunk = self.process_chunk(chunk)
            chunks.append(processed_chunk)
        
        return pd.concat(chunks, ignore_index=True)
    
    def process_chunk(self, chunk: pd.DataFrame) -> pd.DataFrame:
        """Process individual chunks of data."""
        # Example processing: calculate derived metrics
        if 'order_date' in chunk.columns:
            chunk['order_date'] = pd.to_datetime(chunk['order_date'])
            chunk['day_of_week'] = chunk['order_date'].dt.day_name()
            chunk['month'] = chunk['order_date'].dt.month
        
        return chunk
    
    def get_customer_analysis(self, city: str = None) -> pd.DataFrame:
        """Get customer analysis with optional city filter."""
        
        base_query = """
            SELECT 
                c.customer_id,
                c.name,
                c.city,
                c.signup_date,
                COUNT(o.order_id) as total_orders,
                SUM(o.quantity * p.price) as total_spent,
                MAX(o.order_date) as last_order_date
            FROM customers c
            LEFT JOIN orders o ON c.customer_id = o.customer_id
            LEFT JOIN products p ON o.product_id = p.product_id
        """
        
        if city:
            query = base_query + " WHERE c.city = %(city)s"
            params = {'city': city}
        else:
            query = base_query
            params = None
        
        query += """
            GROUP BY c.customer_id, c.name, c.city, c.signup_date
            ORDER BY total_spent DESC
        """
        
        df = self.query_to_dataframe(query, params)
        
        # Add pandas-based calculations
        df['signup_date'] = pd.to_datetime(df['signup_date'])
        df['last_order_date'] = pd.to_datetime(df['last_order_date'])
        df['days_since_signup'] = (pd.Timestamp.now() - df['signup_date']).dt.days
        df['days_since_last_order'] = (pd.Timestamp.now() - df['last_order_date']).dt.days
        
        # Customer segmentation
        df['customer_segment'] = pd.cut(
            df['total_spent'].fillna(0),
            bins=[0, 100, 500, 1000, float('inf')],
            labels=['Low', 'Medium', 'High', 'VIP']
        )
        
        return df

# Use the analyzer
analyzer = DatabaseAnalyzer(engine)

# Get customer analysis for specific city
tokyo_customers = analyzer.get_customer_analysis(city='Tokyo')
print("Tokyo Customer Analysis:")
print(tokyo_customers.head())

# Analyze customer segments
segment_analysis = tokyo_customers.groupby('customer_segment').agg({
    'customer_id': 'count',
    'total_spent': ['mean', 'sum'],
    'total_orders': 'mean',
    'days_since_last_order': 'mean'
}).round(2)

print("\nCustomer Segment Analysis:")
print(segment_analysis)

Performance Optimization Techniques

Database performance becomes critical when working with large datasets. Understanding indexing, query optimization, and data partitioning helps you build efficient analytical workflows.

# Database performance optimization
def optimize_database_performance(engine):
    """Apply performance optimizations to the database."""
    
    optimization_queries = [
        # Create indexes for common query patterns
        "CREATE INDEX IF NOT EXISTS idx_orders_customer_id ON orders(customer_id)",
        "CREATE INDEX IF NOT EXISTS idx_orders_product_id ON orders(product_id)",
        "CREATE INDEX IF NOT EXISTS idx_orders_date ON orders(order_date)",
        "CREATE INDEX IF NOT EXISTS idx_customers_city ON customers(city)",
        
        # Analyze tables for query optimization (SQLite specific)
        "ANALYZE customers",
        "ANALYZE products", 
        "ANALYZE orders"
    ]
    
    with engine.connect() as conn:
        for query in optimization_queries:
            try:
                conn.execute(text(query))
                print(f"Executed: {query}")
            except Exception as e:
                print(f"Failed to execute {query}: {e}")

# Apply optimizations
optimize_database_performance(engine)

# Query performance comparison
def compare_query_performance(engine, query: str, iterations: int = 5):
    """Compare query performance before and after optimization."""
    
    import time
    
    times = []
    for i in range(iterations):
        start_time = time.time()
        
        with engine.connect() as conn:
            result = conn.execute(text(query))
            rows = result.fetchall()
        
        end_time = time.time()
        times.append(end_time - start_time)
    
    avg_time = sum(times) / len(times)
    print(f"Average query time: {avg_time:.4f} seconds ({len(rows)} rows)")
    
    return avg_time

# Test query performance
test_query = """
    SELECT c.city, COUNT(*) as customer_count, SUM(o.quantity * p.price) as revenue
    FROM customers c
    JOIN orders o ON c.customer_id = o.customer_id
    JOIN products p ON o.product_id = p.product_id
    WHERE o.order_date >= '2023-01-01'
    GROUP BY c.city
    ORDER BY revenue DESC
"""

performance_time = compare_query_performance(engine, test_query)

Working with Large Datasets

When datasets exceed memory limits, streaming and chunked processing become essential. These techniques let you analyze datasets that are much larger than your available RAM.

# Large dataset processing strategies
def process_large_dataset_streaming(engine, batch_size: int = 1000):
    """Process large datasets using streaming/chunked approach."""
    
    # Query that might return millions of rows
    large_query = """
        SELECT 
            o.order_id,
            o.customer_id,
            o.product_id,
            o.quantity,
            o.order_date,
            p.price,
            p.category,
            c.city
        FROM orders o
        JOIN products p ON o.product_id = p.product_id
        JOIN customers c ON o.customer_id = c.customer_id
    """
    
    # Process in chunks
    aggregated_results = {}
    total_processed = 0
    
    for chunk in pd.read_sql_query(large_query, engine, chunksize=batch_size):
        # Process each chunk
        chunk['revenue'] = chunk['quantity'] * chunk['price']
        chunk['order_date'] = pd.to_datetime(chunk['order_date'])
        chunk['month'] = chunk['order_date'].dt.to_period('M')
        
        # Aggregate results from this chunk
        chunk_agg = chunk.groupby(['city', 'category', 'month']).agg({
            'revenue': 'sum',
            'order_id': 'count'
        }).reset_index()
        
        # Combine with previous results
        for _, row in chunk_agg.iterrows():
            key = (row['city'], row['category'], str(row['month']))
            
            if key not in aggregated_results:
                aggregated_results[key] = {'revenue': 0, 'orders': 0}
            
            aggregated_results[key]['revenue'] += row['revenue']
            aggregated_results[key]['orders'] += row['order_id']
        
        total_processed += len(chunk)
        print(f"Processed {total_processed} rows...")
    
    # Convert aggregated results to DataFrame
    final_results = []
    for (city, category, month), metrics in aggregated_results.items():
        final_results.append({
            'city': city,
            'category': category,
            'month': month,
            'total_revenue': metrics['revenue'],
            'total_orders': metrics['orders']
        })
    
    return pd.DataFrame(final_results)

# Process large dataset
streaming_results = process_large_dataset_streaming(engine, batch_size=500)
print("Streaming Processing Results:")
print(streaming_results.head(10))
print(f"Total aggregated records: {len(streaming_results)}")

Database Integration Best Practices

Effective database integration requires understanding connection pooling, transaction management, and error handling. These practices ensure reliable and efficient data access.

# Connection management and best practices
@contextmanager
def database_transaction(engine):
    """Context manager for database transactions."""
    conn = engine.connect()
    trans = conn.begin()
    
    try:
        yield conn
        trans.commit()
    except Exception:
        trans.rollback()
        raise
    finally:
        conn.close()

def safe_bulk_insert(engine, df: pd.DataFrame, table_name: str, 
                    batch_size: int = 1000):
    """Safely insert large DataFrames in batches."""
    
    total_rows = len(df)
    inserted_rows = 0
    
    try:
        for start_idx in range(0, total_rows, batch_size):
            end_idx = min(start_idx + batch_size, total_rows)
            batch = df.iloc[start_idx:end_idx]
            
            with database_transaction(engine) as conn:
                batch.to_sql(table_name, conn, if_exists='append', 
                           index=False, method='multi')
            
            inserted_rows += len(batch)
            print(f"Inserted {inserted_rows}/{total_rows} rows")
    
    except Exception as e:
        print(f"Bulk insert failed at row {inserted_rows}: {e}")
        raise
    
    return inserted_rows

# Example: Create and insert new analysis results
analysis_results = pd.DataFrame({
    'analysis_date': [pd.Timestamp.now()] * 5,
    'metric_name': ['revenue', 'orders', 'customers', 'avg_order', 'conversion'],
    'metric_value': [150000, 1200, 800, 125, 0.67],
    'city': ['Tokyo'] * 5
})

# Safe bulk insert
try:
    inserted = safe_bulk_insert(engine, analysis_results, 'analysis_metrics')
    print(f"Successfully inserted {inserted} analysis records")
except Exception as e:
    print(f"Insert failed: {e}")

Database integration transforms data science from a memory-constrained activity to one that can handle enterprise-scale datasets. The key is leveraging database strengths for what they do best while using Python for complex analysis and visualization.

In our next part, we’ll explore model deployment and production considerations, learning how to take your data science work from notebooks to production systems that can serve real users and business processes.