Advanced Security and Optimization Techniques

This section explores sophisticated Docker security and performance patterns including custom security plugins, advanced profiling tools, and enterprise-grade monitoring solutions.

Custom Security Plugins

Runtime Security Engine

// security-engine/main.go
package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "net/http"
    "time"
    
    "github.com/docker/docker/api/types"
    "github.com/docker/docker/client"
)

type SecurityEngine struct {
    dockerClient *client.Client
    policies     *SecurityPolicies
    violations   chan SecurityViolation
}

type SecurityPolicies struct {
    ImagePolicies     []ImagePolicy     `json:"image_policies"`
    RuntimePolicies   []RuntimePolicy   `json:"runtime_policies"`
    NetworkPolicies   []NetworkPolicy   `json:"network_policies"`
}

type ImagePolicy struct {
    Name        string   `json:"name"`
    AllowedTags []string `json:"allowed_tags"`
    BlockedCVEs []string `json:"blocked_cves"`
    MaxSeverity string   `json:"max_severity"`
}

type RuntimePolicy struct {
    Name              string   `json:"name"`
    AllowedProcesses  []string `json:"allowed_processes"`
    BlockedSyscalls   []string `json:"blocked_syscalls"`
    MaxCPUPercent     float64  `json:"max_cpu_percent"`
    MaxMemoryMB       int64    `json:"max_memory_mb"`
}

type NetworkPolicy struct {
    Name           string `json:"name"`
    AllowedPorts   []int  `json:"allowed_ports"`
    BlockedDomains []string `json:"blocked_domains"`
}

type SecurityViolation struct {
    ContainerID   string                 `json:"container_id"`
    ContainerName string                 `json:"container_name"`
    ViolationType string                 `json:"violation_type"`
    Severity      string                 `json:"severity"`
    Description   string                 `json:"description"`
    Timestamp     time.Time              `json:"timestamp"`
    Metadata      map[string]interface{} `json:"metadata"`
}

func NewSecurityEngine() (*SecurityEngine, error) {
    dockerClient, err := client.NewClientWithOpts(client.FromEnv)
    if err != nil {
        return nil, err
    }
    
    policies := &SecurityPolicies{
        ImagePolicies: []ImagePolicy{
            {
                Name:        "production-images",
                AllowedTags: []string{"latest", "stable", "v*"},
                BlockedCVEs: []string{"CVE-2021-44228", "CVE-2021-45046"},
                MaxSeverity: "HIGH",
            },
        },
        RuntimePolicies: []RuntimePolicy{
            {
                Name:             "standard-runtime",
                AllowedProcesses: []string{"node", "nginx", "postgres", "redis"},
                BlockedSyscalls:  []string{"ptrace", "mount", "umount"},
                MaxCPUPercent:    80.0,
                MaxMemoryMB:      2048,
            },
        },
        NetworkPolicies: []NetworkPolicy{
            {
                Name:           "web-tier",
                AllowedPorts:   []int{80, 443, 8080},
                BlockedDomains: []string{"malicious.com", "suspicious.net"},
            },
        },
    }
    
    return &SecurityEngine{
        dockerClient: dockerClient,
        policies:     policies,
        violations:   make(chan SecurityViolation, 1000),
    }, nil
}

func (se *SecurityEngine) MonitorContainers(ctx context.Context) {
    ticker := time.NewTicker(10 * time.Second)
    defer ticker.Stop()
    
    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            se.scanRunningContainers(ctx)
        }
    }
}

func (se *SecurityEngine) scanRunningContainers(ctx context.Context) {
    containers, err := se.dockerClient.ContainerList(ctx, types.ContainerListOptions{})
    if err != nil {
        log.Printf("Error listing containers: %v", err)
        return
    }
    
    for _, container := range containers {
        go se.analyzeContainer(ctx, container)
    }
}

func (se *SecurityEngine) analyzeContainer(ctx context.Context, container types.Container) {
    // Check image compliance
    se.checkImageCompliance(container)
    
    // Check runtime compliance
    se.checkRuntimeCompliance(ctx, container)
    
    // Check network compliance
    se.checkNetworkCompliance(ctx, container)
}

func (se *SecurityEngine) checkImageCompliance(container types.Container) {
    for _, policy := range se.policies.ImagePolicies {
        // Check if image tag is allowed
        imageTag := container.Image
        allowed := false
        for _, allowedTag := range policy.AllowedTags {
            if matchesPattern(imageTag, allowedTag) {
                allowed = true
                break
            }
        }
        
        if !allowed {
            se.violations <- SecurityViolation{
                ContainerID:   container.ID,
                ContainerName: container.Names[0],
                ViolationType: "image_policy",
                Severity:      "HIGH",
                Description:   fmt.Sprintf("Image tag %s not allowed by policy %s", imageTag, policy.Name),
                Timestamp:     time.Now(),
                Metadata: map[string]interface{}{
                    "image": imageTag,
                    "policy": policy.Name,
                },
            }
        }
    }
}

func (se *SecurityEngine) checkRuntimeCompliance(ctx context.Context, container types.Container) {
    // Get container stats
    stats, err := se.dockerClient.ContainerStats(ctx, container.ID, false)
    if err != nil {
        return
    }
    defer stats.Body.Close()
    
    var containerStats types.StatsJSON
    if err := json.NewDecoder(stats.Body).Decode(&containerStats); err != nil {
        return
    }
    
    // Check CPU usage
    cpuPercent := calculateCPUPercent(&containerStats)
    for _, policy := range se.policies.RuntimePolicies {
        if cpuPercent > policy.MaxCPUPercent {
            se.violations <- SecurityViolation{
                ContainerID:   container.ID,
                ContainerName: container.Names[0],
                ViolationType: "runtime_policy",
                Severity:      "MEDIUM",
                Description:   fmt.Sprintf("CPU usage %.2f%% exceeds policy limit %.2f%%", cpuPercent, policy.MaxCPUPercent),
                Timestamp:     time.Now(),
                Metadata: map[string]interface{}{
                    "cpu_percent": cpuPercent,
                    "limit": policy.MaxCPUPercent,
                },
            }
        }
    }
    
    // Check memory usage
    memoryMB := containerStats.MemoryStats.Usage / 1024 / 1024
    for _, policy := range se.policies.RuntimePolicies {
        if int64(memoryMB) > policy.MaxMemoryMB {
            se.violations <- SecurityViolation{
                ContainerID:   container.ID,
                ContainerName: container.Names[0],
                ViolationType: "runtime_policy",
                Severity:      "HIGH",
                Description:   fmt.Sprintf("Memory usage %dMB exceeds policy limit %dMB", memoryMB, policy.MaxMemoryMB),
                Timestamp:     time.Now(),
                Metadata: map[string]interface{}{
                    "memory_mb": memoryMB,
                    "limit": policy.MaxMemoryMB,
                },
            }
        }
    }
}

func (se *SecurityEngine) checkNetworkCompliance(ctx context.Context, container types.Container) {
    // Get container network settings
    containerJSON, err := se.dockerClient.ContainerInspect(ctx, container.ID)
    if err != nil {
        return
    }
    
    // Check exposed ports
    for port := range containerJSON.NetworkSettings.Ports {
        portNum := port.Int()
        allowed := false
        
        for _, policy := range se.policies.NetworkPolicies {
            for _, allowedPort := range policy.AllowedPorts {
                if portNum == allowedPort {
                    allowed = true
                    break
                }
            }
        }
        
        if !allowed {
            se.violations <- SecurityViolation{
                ContainerID:   container.ID,
                ContainerName: container.Names[0],
                ViolationType: "network_policy",
                Severity:      "MEDIUM",
                Description:   fmt.Sprintf("Port %d not allowed by network policy", portNum),
                Timestamp:     time.Now(),
                Metadata: map[string]interface{}{
                    "port": portNum,
                },
            }
        }
    }
}

func (se *SecurityEngine) ProcessViolations(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            return
        case violation := <-se.violations:
            se.handleViolation(violation)
        }
    }
}

func (se *SecurityEngine) handleViolation(violation SecurityViolation) {
    // Log violation
    log.Printf("Security Violation: %+v", violation)
    
    // Send to external systems (SIEM, alerting, etc.)
    se.sendToSIEM(violation)
    
    // Take automated action based on severity
    switch violation.Severity {
    case "CRITICAL":
        se.quarantineContainer(violation.ContainerID)
    case "HIGH":
        se.alertSecurityTeam(violation)
    case "MEDIUM":
        se.logForReview(violation)
    }
}

func (se *SecurityEngine) quarantineContainer(containerID string) {
    ctx := context.Background()
    
    // Stop the container
    timeout := 30
    if err := se.dockerClient.ContainerStop(ctx, containerID, &timeout); err != nil {
        log.Printf("Failed to stop container %s: %v", containerID, err)
    }
    
    log.Printf("Container %s quarantined due to critical security violation", containerID)
}

func (se *SecurityEngine) sendToSIEM(violation SecurityViolation) {
    // Implementation for SIEM integration
    // This could be Splunk, ELK, or other SIEM systems
}

func (se *SecurityEngine) alertSecurityTeam(violation SecurityViolation) {
    // Implementation for alerting (Slack, PagerDuty, etc.)
}

func (se *SecurityEngine) logForReview(violation SecurityViolation) {
    // Implementation for logging violations for manual review
}

func calculateCPUPercent(stats *types.StatsJSON) float64 {
    cpuDelta := float64(stats.CPUStats.CPUUsage.TotalUsage - stats.PreCPUStats.CPUUsage.TotalUsage)
    systemDelta := float64(stats.CPUStats.SystemUsage - stats.PreCPUStats.SystemUsage)
    
    if systemDelta > 0.0 && cpuDelta > 0.0 {
        return (cpuDelta / systemDelta) * float64(len(stats.CPUStats.CPUUsage.PercpuUsage)) * 100.0
    }
    return 0.0
}

func matchesPattern(text, pattern string) bool {
    // Simple pattern matching - in production, use proper regex
    return text == pattern || pattern == "*"
}

func main() {
    engine, err := NewSecurityEngine()
    if err != nil {
        log.Fatal(err)
    }
    
    ctx := context.Background()
    
    // Start monitoring
    go engine.MonitorContainers(ctx)
    go engine.ProcessViolations(ctx)
    
    // Start HTTP API
    http.HandleFunc("/violations", func(w http.ResponseWriter, r *http.Request) {
        // Return recent violations
        w.Header().Set("Content-Type", "application/json")
        json.NewEncoder(w).Encode(map[string]string{"status": "ok"})
    })
    
    log.Println("Security Engine started on :8080")
    log.Fatal(http.ListenAndServe(":8080", nil))
}

Advanced Performance Profiling

Container Performance Analyzer

#!/usr/bin/env python3
# performance-analyzer.py

import asyncio
import docker
import psutil
import json
import time
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

class PerformanceAnalyzer:
    def __init__(self):
        self.docker_client = docker.from_env()
        self.metrics_history = {}
        self.analysis_results = {}
        
    def collect_system_metrics(self) -> Dict:
        """Collect system-wide performance metrics"""
        return {
            'timestamp': datetime.now().isoformat(),
            'cpu': {
                'percent': psutil.cpu_percent(interval=1),
                'count': psutil.cpu_count(),
                'freq': psutil.cpu_freq()._asdict() if psutil.cpu_freq() else None,
                'per_cpu': psutil.cpu_percent(interval=1, percpu=True),
                'load_avg': psutil.getloadavg() if hasattr(psutil, 'getloadavg') else None
            },
            'memory': {
                'total': psutil.virtual_memory().total,
                'available': psutil.virtual_memory().available,
                'percent': psutil.virtual_memory().percent,
                'used': psutil.virtual_memory().used,
                'free': psutil.virtual_memory().free,
                'buffers': psutil.virtual_memory().buffers,
                'cached': psutil.virtual_memory().cached
            },
            'disk': {
                'usage': {partition.mountpoint: psutil.disk_usage(partition.mountpoint)._asdict() 
                         for partition in psutil.disk_partitions()},
                'io': psutil.disk_io_counters()._asdict() if psutil.disk_io_counters() else None
            },
            'network': {
                'io': psutil.net_io_counters()._asdict(),
                'connections': len(psutil.net_connections())
            }
        }
    
    def collect_container_metrics(self, container) -> Dict:
        """Collect detailed container performance metrics"""
        try:
            stats = container.stats(stream=False)
            
            # Calculate CPU percentage
            cpu_delta = stats['cpu_stats']['cpu_usage']['total_usage'] - \
                       stats['precpu_stats']['cpu_usage']['total_usage']
            system_delta = stats['cpu_stats']['system_cpu_usage'] - \
                          stats['precpu_stats']['system_cpu_usage']
            
            cpu_percent = 0.0
            if system_delta > 0 and cpu_delta > 0:
                cpu_percent = (cpu_delta / system_delta) * \
                             len(stats['cpu_stats']['cpu_usage']['percpu_usage']) * 100.0
            
            # Memory metrics
            memory_usage = stats['memory_stats']['usage']
            memory_limit = stats['memory_stats']['limit']
            memory_percent = (memory_usage / memory_limit) * 100.0
            
            # Network metrics
            networks = stats.get('networks', {})
            total_rx_bytes = sum(net['rx_bytes'] for net in networks.values())
            total_tx_bytes = sum(net['tx_bytes'] for net in networks.values())
            
            # Block I/O metrics
            blkio_stats = stats.get('blkio_stats', {})
            io_service_bytes = blkio_stats.get('io_service_bytes_recursive', [])
            
            read_bytes = sum(entry['value'] for entry in io_service_bytes 
                           if entry['op'] == 'Read')
            write_bytes = sum(entry['value'] for entry in io_service_bytes 
                            if entry['op'] == 'Write')
            
            return {
                'timestamp': datetime.now().isoformat(),
                'container_id': container.id,
                'container_name': container.name,
                'cpu': {
                    'percent': cpu_percent,
                    'usage': stats['cpu_stats']['cpu_usage']['total_usage'],
                    'system_usage': stats['cpu_stats']['system_cpu_usage'],
                    'throttling': stats['cpu_stats'].get('throttling_data', {})
                },
                'memory': {
                    'usage': memory_usage,
                    'limit': memory_limit,
                    'percent': memory_percent,
                    'cache': stats['memory_stats'].get('stats', {}).get('cache', 0),
                    'rss': stats['memory_stats'].get('stats', {}).get('rss', 0)
                },
                'network': {
                    'rx_bytes': total_rx_bytes,
                    'tx_bytes': total_tx_bytes,
                    'rx_packets': sum(net['rx_packets'] for net in networks.values()),
                    'tx_packets': sum(net['tx_packets'] for net in networks.values())
                },
                'blkio': {
                    'read_bytes': read_bytes,
                    'write_bytes': write_bytes
                }
            }
            
        except Exception as e:
            print(f"Error collecting metrics for container {container.name}: {e}")
            return None
    
    def analyze_performance_trends(self, container_id: str, hours: int = 24) -> Dict:
        """Analyze performance trends for a container"""
        if container_id not in self.metrics_history:
            return {"error": "No metrics history found"}
        
        metrics = self.metrics_history[container_id]
        cutoff_time = datetime.now() - timedelta(hours=hours)
        
        # Filter recent metrics
        recent_metrics = [m for m in metrics 
                         if datetime.fromisoformat(m['timestamp']) > cutoff_time]
        
        if not recent_metrics:
            return {"error": "No recent metrics found"}
        
        # Extract time series data
        timestamps = [datetime.fromisoformat(m['timestamp']) for m in recent_metrics]
        cpu_values = [m['cpu']['percent'] for m in recent_metrics]
        memory_values = [m['memory']['percent'] for m in recent_metrics]
        
        # Calculate statistics
        analysis = {
            'container_id': container_id,
            'analysis_period': f"{hours} hours",
            'sample_count': len(recent_metrics),
            'cpu': {
                'mean': np.mean(cpu_values),
                'std': np.std(cpu_values),
                'min': np.min(cpu_values),
                'max': np.max(cpu_values),
                'p95': np.percentile(cpu_values, 95),
                'p99': np.percentile(cpu_values, 99)
            },
            'memory': {
                'mean': np.mean(memory_values),
                'std': np.std(memory_values),
                'min': np.min(memory_values),
                'max': np.max(memory_values),
                'p95': np.percentile(memory_values, 95),
                'p99': np.percentile(memory_values, 99)
            }
        }
        
        # Detect anomalies
        analysis['anomalies'] = self.detect_anomalies(recent_metrics)
        
        # Performance recommendations
        analysis['recommendations'] = self.generate_recommendations(analysis)
        
        return analysis
    
    def detect_anomalies(self, metrics: List[Dict]) -> List[Dict]:
        """Detect performance anomalies using statistical methods"""
        anomalies = []
        
        cpu_values = [m['cpu']['percent'] for m in metrics]
        memory_values = [m['memory']['percent'] for m in metrics]
        
        # CPU anomalies (values > 2 standard deviations from mean)
        cpu_mean = np.mean(cpu_values)
        cpu_std = np.std(cpu_values)
        cpu_threshold = cpu_mean + 2 * cpu_std
        
        for i, metric in enumerate(metrics):
            if metric['cpu']['percent'] > cpu_threshold:
                anomalies.append({
                    'type': 'cpu_spike',
                    'timestamp': metric['timestamp'],
                    'value': metric['cpu']['percent'],
                    'threshold': cpu_threshold,
                    'severity': 'high' if metric['cpu']['percent'] > cpu_mean + 3 * cpu_std else 'medium'
                })
        
        # Memory anomalies
        memory_mean = np.mean(memory_values)
        memory_std = np.std(memory_values)
        memory_threshold = memory_mean + 2 * memory_std
        
        for i, metric in enumerate(metrics):
            if metric['memory']['percent'] > memory_threshold:
                anomalies.append({
                    'type': 'memory_spike',
                    'timestamp': metric['timestamp'],
                    'value': metric['memory']['percent'],
                    'threshold': memory_threshold,
                    'severity': 'high' if metric['memory']['percent'] > memory_mean + 3 * memory_std else 'medium'
                })
        
        return anomalies
    
    def generate_recommendations(self, analysis: Dict) -> List[str]:
        """Generate performance optimization recommendations"""
        recommendations = []
        
        cpu_stats = analysis['cpu']
        memory_stats = analysis['memory']
        
        # CPU recommendations
        if cpu_stats['p95'] > 80:
            recommendations.append("Consider increasing CPU limits or optimizing CPU-intensive operations")
        
        if cpu_stats['std'] > 20:
            recommendations.append("High CPU variance detected - investigate workload patterns")
        
        # Memory recommendations
        if memory_stats['p95'] > 85:
            recommendations.append("Consider increasing memory limits or optimizing memory usage")
        
        if memory_stats['max'] > 95:
            recommendations.append("Memory usage approaching limits - risk of OOM kills")
        
        # General recommendations
        if len(analysis.get('anomalies', [])) > 10:
            recommendations.append("Frequent anomalies detected - review application performance")
        
        return recommendations
    
    def generate_performance_report(self, container_id: str) -> str:
        """Generate comprehensive performance report"""
        analysis = self.analyze_performance_trends(container_id)
        
        if 'error' in analysis:
            return f"Error generating report: {analysis['error']}"
        
        report = f"""
Performance Analysis Report
Container ID: {container_id}
Analysis Period: {analysis['analysis_period']}
Sample Count: {analysis['sample_count']}

CPU Performance:
- Average: {analysis['cpu']['mean']:.2f}%
- 95th Percentile: {analysis['cpu']['p95']:.2f}%
- Maximum: {analysis['cpu']['max']:.2f}%
- Standard Deviation: {analysis['cpu']['std']:.2f}%

Memory Performance:
- Average: {analysis['memory']['mean']:.2f}%
- 95th Percentile: {analysis['memory']['p95']:.2f}%
- Maximum: {analysis['memory']['max']:.2f}%
- Standard Deviation: {analysis['memory']['std']:.2f}%

Anomalies Detected: {len(analysis['anomalies'])}

Recommendations:
"""
        
        for i, rec in enumerate(analysis['recommendations'], 1):
            report += f"{i}. {rec}\n"
        
        return report
    
    def create_performance_dashboard(self, container_id: str, output_file: str = "performance_dashboard.png"):
        """Create visual performance dashboard"""
        if container_id not in self.metrics_history:
            print("No metrics history found")
            return
        
        metrics = self.metrics_history[container_id]
        
        # Extract data
        timestamps = [datetime.fromisoformat(m['timestamp']) for m in metrics]
        cpu_values = [m['cpu']['percent'] for m in metrics]
        memory_values = [m['memory']['percent'] for m in metrics]
        
        # Create dashboard
        fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 10))
        
        # CPU usage over time
        ax1.plot(timestamps, cpu_values, label='CPU %', color='blue')
        ax1.set_title('CPU Usage Over Time')
        ax1.set_ylabel('CPU %')
        ax1.legend()
        ax1.grid(True)
        
        # Memory usage over time
        ax2.plot(timestamps, memory_values, label='Memory %', color='red')
        ax2.set_title('Memory Usage Over Time')
        ax2.set_ylabel('Memory %')
        ax2.legend()
        ax2.grid(True)
        
        # CPU distribution
        ax3.hist(cpu_values, bins=30, alpha=0.7, color='blue')
        ax3.set_title('CPU Usage Distribution')
        ax3.set_xlabel('CPU %')
        ax3.set_ylabel('Frequency')
        
        # Memory distribution
        ax4.hist(memory_values, bins=30, alpha=0.7, color='red')
        ax4.set_title('Memory Usage Distribution')
        ax4.set_xlabel('Memory %')
        ax4.set_ylabel('Frequency')
        
        plt.tight_layout()
        plt.savefig(output_file, dpi=300, bbox_inches='tight')
        plt.close()
        
        print(f"Performance dashboard saved to {output_file}")
    
    async def continuous_monitoring(self, duration_hours: int = 24):
        """Run continuous performance monitoring"""
        end_time = datetime.now() + timedelta(hours=duration_hours)
        
        while datetime.now() < end_time:
            # Collect system metrics
            system_metrics = self.collect_system_metrics()
            
            # Collect container metrics
            for container in self.docker_client.containers.list():
                container_metrics = self.collect_container_metrics(container)
                
                if container_metrics:
                    container_id = container.id
                    if container_id not in self.metrics_history:
                        self.metrics_history[container_id] = []
                    
                    self.metrics_history[container_id].append(container_metrics)
                    
                    # Keep only last 1000 metrics per container
                    if len(self.metrics_history[container_id]) > 1000:
                        self.metrics_history[container_id] = self.metrics_history[container_id][-1000:]
            
            # Wait before next collection
            await asyncio.sleep(30)  # Collect every 30 seconds
        
        print(f"Monitoring completed after {duration_hours} hours")

if __name__ == "__main__":
    analyzer = PerformanceAnalyzer()
    
    # Run continuous monitoring for 1 hour
    asyncio.run(analyzer.continuous_monitoring(duration_hours=1))
    
    # Generate reports for all monitored containers
    for container_id in analyzer.metrics_history:
        print(analyzer.generate_performance_report(container_id))
        analyzer.create_performance_dashboard(container_id, f"dashboard_{container_id[:12]}.png")

Summary

This section covered advanced security and optimization techniques:

Custom Security Solutions

  • Runtime Security Engine: Go-based security policy enforcement with real-time monitoring
  • Policy Framework: Comprehensive image, runtime, and network policy definitions
  • Automated Response: Container quarantine and security team alerting

Advanced Performance Analysis

  • Performance Analyzer: Python-based comprehensive metrics collection and analysis
  • Anomaly Detection: Statistical methods for identifying performance issues
  • Trend Analysis: Historical performance analysis with recommendations

Enterprise Patterns

  • Security Automation: Policy-driven security enforcement and violation handling
  • Performance Intelligence: AI-driven performance optimization recommendations
  • Continuous Monitoring: Real-time security and performance assessment

Next Steps: Part 5 demonstrates complete production implementations combining all these advanced techniques into enterprise-ready security and optimization solutions.