Advanced Security and Optimization Techniques
This section explores sophisticated Docker security and performance patterns including custom security plugins, advanced profiling tools, and enterprise-grade monitoring solutions.
Custom Security Plugins
Runtime Security Engine
// security-engine/main.go
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"time"
"github.com/docker/docker/api/types"
"github.com/docker/docker/client"
)
type SecurityEngine struct {
dockerClient *client.Client
policies *SecurityPolicies
violations chan SecurityViolation
}
type SecurityPolicies struct {
ImagePolicies []ImagePolicy `json:"image_policies"`
RuntimePolicies []RuntimePolicy `json:"runtime_policies"`
NetworkPolicies []NetworkPolicy `json:"network_policies"`
}
type ImagePolicy struct {
Name string `json:"name"`
AllowedTags []string `json:"allowed_tags"`
BlockedCVEs []string `json:"blocked_cves"`
MaxSeverity string `json:"max_severity"`
}
type RuntimePolicy struct {
Name string `json:"name"`
AllowedProcesses []string `json:"allowed_processes"`
BlockedSyscalls []string `json:"blocked_syscalls"`
MaxCPUPercent float64 `json:"max_cpu_percent"`
MaxMemoryMB int64 `json:"max_memory_mb"`
}
type NetworkPolicy struct {
Name string `json:"name"`
AllowedPorts []int `json:"allowed_ports"`
BlockedDomains []string `json:"blocked_domains"`
}
type SecurityViolation struct {
ContainerID string `json:"container_id"`
ContainerName string `json:"container_name"`
ViolationType string `json:"violation_type"`
Severity string `json:"severity"`
Description string `json:"description"`
Timestamp time.Time `json:"timestamp"`
Metadata map[string]interface{} `json:"metadata"`
}
func NewSecurityEngine() (*SecurityEngine, error) {
dockerClient, err := client.NewClientWithOpts(client.FromEnv)
if err != nil {
return nil, err
}
policies := &SecurityPolicies{
ImagePolicies: []ImagePolicy{
{
Name: "production-images",
AllowedTags: []string{"latest", "stable", "v*"},
BlockedCVEs: []string{"CVE-2021-44228", "CVE-2021-45046"},
MaxSeverity: "HIGH",
},
},
RuntimePolicies: []RuntimePolicy{
{
Name: "standard-runtime",
AllowedProcesses: []string{"node", "nginx", "postgres", "redis"},
BlockedSyscalls: []string{"ptrace", "mount", "umount"},
MaxCPUPercent: 80.0,
MaxMemoryMB: 2048,
},
},
NetworkPolicies: []NetworkPolicy{
{
Name: "web-tier",
AllowedPorts: []int{80, 443, 8080},
BlockedDomains: []string{"malicious.com", "suspicious.net"},
},
},
}
return &SecurityEngine{
dockerClient: dockerClient,
policies: policies,
violations: make(chan SecurityViolation, 1000),
}, nil
}
func (se *SecurityEngine) MonitorContainers(ctx context.Context) {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
se.scanRunningContainers(ctx)
}
}
}
func (se *SecurityEngine) scanRunningContainers(ctx context.Context) {
containers, err := se.dockerClient.ContainerList(ctx, types.ContainerListOptions{})
if err != nil {
log.Printf("Error listing containers: %v", err)
return
}
for _, container := range containers {
go se.analyzeContainer(ctx, container)
}
}
func (se *SecurityEngine) analyzeContainer(ctx context.Context, container types.Container) {
// Check image compliance
se.checkImageCompliance(container)
// Check runtime compliance
se.checkRuntimeCompliance(ctx, container)
// Check network compliance
se.checkNetworkCompliance(ctx, container)
}
func (se *SecurityEngine) checkImageCompliance(container types.Container) {
for _, policy := range se.policies.ImagePolicies {
// Check if image tag is allowed
imageTag := container.Image
allowed := false
for _, allowedTag := range policy.AllowedTags {
if matchesPattern(imageTag, allowedTag) {
allowed = true
break
}
}
if !allowed {
se.violations <- SecurityViolation{
ContainerID: container.ID,
ContainerName: container.Names[0],
ViolationType: "image_policy",
Severity: "HIGH",
Description: fmt.Sprintf("Image tag %s not allowed by policy %s", imageTag, policy.Name),
Timestamp: time.Now(),
Metadata: map[string]interface{}{
"image": imageTag,
"policy": policy.Name,
},
}
}
}
}
func (se *SecurityEngine) checkRuntimeCompliance(ctx context.Context, container types.Container) {
// Get container stats
stats, err := se.dockerClient.ContainerStats(ctx, container.ID, false)
if err != nil {
return
}
defer stats.Body.Close()
var containerStats types.StatsJSON
if err := json.NewDecoder(stats.Body).Decode(&containerStats); err != nil {
return
}
// Check CPU usage
cpuPercent := calculateCPUPercent(&containerStats)
for _, policy := range se.policies.RuntimePolicies {
if cpuPercent > policy.MaxCPUPercent {
se.violations <- SecurityViolation{
ContainerID: container.ID,
ContainerName: container.Names[0],
ViolationType: "runtime_policy",
Severity: "MEDIUM",
Description: fmt.Sprintf("CPU usage %.2f%% exceeds policy limit %.2f%%", cpuPercent, policy.MaxCPUPercent),
Timestamp: time.Now(),
Metadata: map[string]interface{}{
"cpu_percent": cpuPercent,
"limit": policy.MaxCPUPercent,
},
}
}
}
// Check memory usage
memoryMB := containerStats.MemoryStats.Usage / 1024 / 1024
for _, policy := range se.policies.RuntimePolicies {
if int64(memoryMB) > policy.MaxMemoryMB {
se.violations <- SecurityViolation{
ContainerID: container.ID,
ContainerName: container.Names[0],
ViolationType: "runtime_policy",
Severity: "HIGH",
Description: fmt.Sprintf("Memory usage %dMB exceeds policy limit %dMB", memoryMB, policy.MaxMemoryMB),
Timestamp: time.Now(),
Metadata: map[string]interface{}{
"memory_mb": memoryMB,
"limit": policy.MaxMemoryMB,
},
}
}
}
}
func (se *SecurityEngine) checkNetworkCompliance(ctx context.Context, container types.Container) {
// Get container network settings
containerJSON, err := se.dockerClient.ContainerInspect(ctx, container.ID)
if err != nil {
return
}
// Check exposed ports
for port := range containerJSON.NetworkSettings.Ports {
portNum := port.Int()
allowed := false
for _, policy := range se.policies.NetworkPolicies {
for _, allowedPort := range policy.AllowedPorts {
if portNum == allowedPort {
allowed = true
break
}
}
}
if !allowed {
se.violations <- SecurityViolation{
ContainerID: container.ID,
ContainerName: container.Names[0],
ViolationType: "network_policy",
Severity: "MEDIUM",
Description: fmt.Sprintf("Port %d not allowed by network policy", portNum),
Timestamp: time.Now(),
Metadata: map[string]interface{}{
"port": portNum,
},
}
}
}
}
func (se *SecurityEngine) ProcessViolations(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case violation := <-se.violations:
se.handleViolation(violation)
}
}
}
func (se *SecurityEngine) handleViolation(violation SecurityViolation) {
// Log violation
log.Printf("Security Violation: %+v", violation)
// Send to external systems (SIEM, alerting, etc.)
se.sendToSIEM(violation)
// Take automated action based on severity
switch violation.Severity {
case "CRITICAL":
se.quarantineContainer(violation.ContainerID)
case "HIGH":
se.alertSecurityTeam(violation)
case "MEDIUM":
se.logForReview(violation)
}
}
func (se *SecurityEngine) quarantineContainer(containerID string) {
ctx := context.Background()
// Stop the container
timeout := 30
if err := se.dockerClient.ContainerStop(ctx, containerID, &timeout); err != nil {
log.Printf("Failed to stop container %s: %v", containerID, err)
}
log.Printf("Container %s quarantined due to critical security violation", containerID)
}
func (se *SecurityEngine) sendToSIEM(violation SecurityViolation) {
// Implementation for SIEM integration
// This could be Splunk, ELK, or other SIEM systems
}
func (se *SecurityEngine) alertSecurityTeam(violation SecurityViolation) {
// Implementation for alerting (Slack, PagerDuty, etc.)
}
func (se *SecurityEngine) logForReview(violation SecurityViolation) {
// Implementation for logging violations for manual review
}
func calculateCPUPercent(stats *types.StatsJSON) float64 {
cpuDelta := float64(stats.CPUStats.CPUUsage.TotalUsage - stats.PreCPUStats.CPUUsage.TotalUsage)
systemDelta := float64(stats.CPUStats.SystemUsage - stats.PreCPUStats.SystemUsage)
if systemDelta > 0.0 && cpuDelta > 0.0 {
return (cpuDelta / systemDelta) * float64(len(stats.CPUStats.CPUUsage.PercpuUsage)) * 100.0
}
return 0.0
}
func matchesPattern(text, pattern string) bool {
// Simple pattern matching - in production, use proper regex
return text == pattern || pattern == "*"
}
func main() {
engine, err := NewSecurityEngine()
if err != nil {
log.Fatal(err)
}
ctx := context.Background()
// Start monitoring
go engine.MonitorContainers(ctx)
go engine.ProcessViolations(ctx)
// Start HTTP API
http.HandleFunc("/violations", func(w http.ResponseWriter, r *http.Request) {
// Return recent violations
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]string{"status": "ok"})
})
log.Println("Security Engine started on :8080")
log.Fatal(http.ListenAndServe(":8080", nil))
}
Advanced Performance Profiling
Container Performance Analyzer
#!/usr/bin/env python3
# performance-analyzer.py
import asyncio
import docker
import psutil
import json
import time
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
class PerformanceAnalyzer:
def __init__(self):
self.docker_client = docker.from_env()
self.metrics_history = {}
self.analysis_results = {}
def collect_system_metrics(self) -> Dict:
"""Collect system-wide performance metrics"""
return {
'timestamp': datetime.now().isoformat(),
'cpu': {
'percent': psutil.cpu_percent(interval=1),
'count': psutil.cpu_count(),
'freq': psutil.cpu_freq()._asdict() if psutil.cpu_freq() else None,
'per_cpu': psutil.cpu_percent(interval=1, percpu=True),
'load_avg': psutil.getloadavg() if hasattr(psutil, 'getloadavg') else None
},
'memory': {
'total': psutil.virtual_memory().total,
'available': psutil.virtual_memory().available,
'percent': psutil.virtual_memory().percent,
'used': psutil.virtual_memory().used,
'free': psutil.virtual_memory().free,
'buffers': psutil.virtual_memory().buffers,
'cached': psutil.virtual_memory().cached
},
'disk': {
'usage': {partition.mountpoint: psutil.disk_usage(partition.mountpoint)._asdict()
for partition in psutil.disk_partitions()},
'io': psutil.disk_io_counters()._asdict() if psutil.disk_io_counters() else None
},
'network': {
'io': psutil.net_io_counters()._asdict(),
'connections': len(psutil.net_connections())
}
}
def collect_container_metrics(self, container) -> Dict:
"""Collect detailed container performance metrics"""
try:
stats = container.stats(stream=False)
# Calculate CPU percentage
cpu_delta = stats['cpu_stats']['cpu_usage']['total_usage'] - \
stats['precpu_stats']['cpu_usage']['total_usage']
system_delta = stats['cpu_stats']['system_cpu_usage'] - \
stats['precpu_stats']['system_cpu_usage']
cpu_percent = 0.0
if system_delta > 0 and cpu_delta > 0:
cpu_percent = (cpu_delta / system_delta) * \
len(stats['cpu_stats']['cpu_usage']['percpu_usage']) * 100.0
# Memory metrics
memory_usage = stats['memory_stats']['usage']
memory_limit = stats['memory_stats']['limit']
memory_percent = (memory_usage / memory_limit) * 100.0
# Network metrics
networks = stats.get('networks', {})
total_rx_bytes = sum(net['rx_bytes'] for net in networks.values())
total_tx_bytes = sum(net['tx_bytes'] for net in networks.values())
# Block I/O metrics
blkio_stats = stats.get('blkio_stats', {})
io_service_bytes = blkio_stats.get('io_service_bytes_recursive', [])
read_bytes = sum(entry['value'] for entry in io_service_bytes
if entry['op'] == 'Read')
write_bytes = sum(entry['value'] for entry in io_service_bytes
if entry['op'] == 'Write')
return {
'timestamp': datetime.now().isoformat(),
'container_id': container.id,
'container_name': container.name,
'cpu': {
'percent': cpu_percent,
'usage': stats['cpu_stats']['cpu_usage']['total_usage'],
'system_usage': stats['cpu_stats']['system_cpu_usage'],
'throttling': stats['cpu_stats'].get('throttling_data', {})
},
'memory': {
'usage': memory_usage,
'limit': memory_limit,
'percent': memory_percent,
'cache': stats['memory_stats'].get('stats', {}).get('cache', 0),
'rss': stats['memory_stats'].get('stats', {}).get('rss', 0)
},
'network': {
'rx_bytes': total_rx_bytes,
'tx_bytes': total_tx_bytes,
'rx_packets': sum(net['rx_packets'] for net in networks.values()),
'tx_packets': sum(net['tx_packets'] for net in networks.values())
},
'blkio': {
'read_bytes': read_bytes,
'write_bytes': write_bytes
}
}
except Exception as e:
print(f"Error collecting metrics for container {container.name}: {e}")
return None
def analyze_performance_trends(self, container_id: str, hours: int = 24) -> Dict:
"""Analyze performance trends for a container"""
if container_id not in self.metrics_history:
return {"error": "No metrics history found"}
metrics = self.metrics_history[container_id]
cutoff_time = datetime.now() - timedelta(hours=hours)
# Filter recent metrics
recent_metrics = [m for m in metrics
if datetime.fromisoformat(m['timestamp']) > cutoff_time]
if not recent_metrics:
return {"error": "No recent metrics found"}
# Extract time series data
timestamps = [datetime.fromisoformat(m['timestamp']) for m in recent_metrics]
cpu_values = [m['cpu']['percent'] for m in recent_metrics]
memory_values = [m['memory']['percent'] for m in recent_metrics]
# Calculate statistics
analysis = {
'container_id': container_id,
'analysis_period': f"{hours} hours",
'sample_count': len(recent_metrics),
'cpu': {
'mean': np.mean(cpu_values),
'std': np.std(cpu_values),
'min': np.min(cpu_values),
'max': np.max(cpu_values),
'p95': np.percentile(cpu_values, 95),
'p99': np.percentile(cpu_values, 99)
},
'memory': {
'mean': np.mean(memory_values),
'std': np.std(memory_values),
'min': np.min(memory_values),
'max': np.max(memory_values),
'p95': np.percentile(memory_values, 95),
'p99': np.percentile(memory_values, 99)
}
}
# Detect anomalies
analysis['anomalies'] = self.detect_anomalies(recent_metrics)
# Performance recommendations
analysis['recommendations'] = self.generate_recommendations(analysis)
return analysis
def detect_anomalies(self, metrics: List[Dict]) -> List[Dict]:
"""Detect performance anomalies using statistical methods"""
anomalies = []
cpu_values = [m['cpu']['percent'] for m in metrics]
memory_values = [m['memory']['percent'] for m in metrics]
# CPU anomalies (values > 2 standard deviations from mean)
cpu_mean = np.mean(cpu_values)
cpu_std = np.std(cpu_values)
cpu_threshold = cpu_mean + 2 * cpu_std
for i, metric in enumerate(metrics):
if metric['cpu']['percent'] > cpu_threshold:
anomalies.append({
'type': 'cpu_spike',
'timestamp': metric['timestamp'],
'value': metric['cpu']['percent'],
'threshold': cpu_threshold,
'severity': 'high' if metric['cpu']['percent'] > cpu_mean + 3 * cpu_std else 'medium'
})
# Memory anomalies
memory_mean = np.mean(memory_values)
memory_std = np.std(memory_values)
memory_threshold = memory_mean + 2 * memory_std
for i, metric in enumerate(metrics):
if metric['memory']['percent'] > memory_threshold:
anomalies.append({
'type': 'memory_spike',
'timestamp': metric['timestamp'],
'value': metric['memory']['percent'],
'threshold': memory_threshold,
'severity': 'high' if metric['memory']['percent'] > memory_mean + 3 * memory_std else 'medium'
})
return anomalies
def generate_recommendations(self, analysis: Dict) -> List[str]:
"""Generate performance optimization recommendations"""
recommendations = []
cpu_stats = analysis['cpu']
memory_stats = analysis['memory']
# CPU recommendations
if cpu_stats['p95'] > 80:
recommendations.append("Consider increasing CPU limits or optimizing CPU-intensive operations")
if cpu_stats['std'] > 20:
recommendations.append("High CPU variance detected - investigate workload patterns")
# Memory recommendations
if memory_stats['p95'] > 85:
recommendations.append("Consider increasing memory limits or optimizing memory usage")
if memory_stats['max'] > 95:
recommendations.append("Memory usage approaching limits - risk of OOM kills")
# General recommendations
if len(analysis.get('anomalies', [])) > 10:
recommendations.append("Frequent anomalies detected - review application performance")
return recommendations
def generate_performance_report(self, container_id: str) -> str:
"""Generate comprehensive performance report"""
analysis = self.analyze_performance_trends(container_id)
if 'error' in analysis:
return f"Error generating report: {analysis['error']}"
report = f"""
Performance Analysis Report
Container ID: {container_id}
Analysis Period: {analysis['analysis_period']}
Sample Count: {analysis['sample_count']}
CPU Performance:
- Average: {analysis['cpu']['mean']:.2f}%
- 95th Percentile: {analysis['cpu']['p95']:.2f}%
- Maximum: {analysis['cpu']['max']:.2f}%
- Standard Deviation: {analysis['cpu']['std']:.2f}%
Memory Performance:
- Average: {analysis['memory']['mean']:.2f}%
- 95th Percentile: {analysis['memory']['p95']:.2f}%
- Maximum: {analysis['memory']['max']:.2f}%
- Standard Deviation: {analysis['memory']['std']:.2f}%
Anomalies Detected: {len(analysis['anomalies'])}
Recommendations:
"""
for i, rec in enumerate(analysis['recommendations'], 1):
report += f"{i}. {rec}\n"
return report
def create_performance_dashboard(self, container_id: str, output_file: str = "performance_dashboard.png"):
"""Create visual performance dashboard"""
if container_id not in self.metrics_history:
print("No metrics history found")
return
metrics = self.metrics_history[container_id]
# Extract data
timestamps = [datetime.fromisoformat(m['timestamp']) for m in metrics]
cpu_values = [m['cpu']['percent'] for m in metrics]
memory_values = [m['memory']['percent'] for m in metrics]
# Create dashboard
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 10))
# CPU usage over time
ax1.plot(timestamps, cpu_values, label='CPU %', color='blue')
ax1.set_title('CPU Usage Over Time')
ax1.set_ylabel('CPU %')
ax1.legend()
ax1.grid(True)
# Memory usage over time
ax2.plot(timestamps, memory_values, label='Memory %', color='red')
ax2.set_title('Memory Usage Over Time')
ax2.set_ylabel('Memory %')
ax2.legend()
ax2.grid(True)
# CPU distribution
ax3.hist(cpu_values, bins=30, alpha=0.7, color='blue')
ax3.set_title('CPU Usage Distribution')
ax3.set_xlabel('CPU %')
ax3.set_ylabel('Frequency')
# Memory distribution
ax4.hist(memory_values, bins=30, alpha=0.7, color='red')
ax4.set_title('Memory Usage Distribution')
ax4.set_xlabel('Memory %')
ax4.set_ylabel('Frequency')
plt.tight_layout()
plt.savefig(output_file, dpi=300, bbox_inches='tight')
plt.close()
print(f"Performance dashboard saved to {output_file}")
async def continuous_monitoring(self, duration_hours: int = 24):
"""Run continuous performance monitoring"""
end_time = datetime.now() + timedelta(hours=duration_hours)
while datetime.now() < end_time:
# Collect system metrics
system_metrics = self.collect_system_metrics()
# Collect container metrics
for container in self.docker_client.containers.list():
container_metrics = self.collect_container_metrics(container)
if container_metrics:
container_id = container.id
if container_id not in self.metrics_history:
self.metrics_history[container_id] = []
self.metrics_history[container_id].append(container_metrics)
# Keep only last 1000 metrics per container
if len(self.metrics_history[container_id]) > 1000:
self.metrics_history[container_id] = self.metrics_history[container_id][-1000:]
# Wait before next collection
await asyncio.sleep(30) # Collect every 30 seconds
print(f"Monitoring completed after {duration_hours} hours")
if __name__ == "__main__":
analyzer = PerformanceAnalyzer()
# Run continuous monitoring for 1 hour
asyncio.run(analyzer.continuous_monitoring(duration_hours=1))
# Generate reports for all monitored containers
for container_id in analyzer.metrics_history:
print(analyzer.generate_performance_report(container_id))
analyzer.create_performance_dashboard(container_id, f"dashboard_{container_id[:12]}.png")
Summary
This section covered advanced security and optimization techniques:
Custom Security Solutions
- Runtime Security Engine: Go-based security policy enforcement with real-time monitoring
- Policy Framework: Comprehensive image, runtime, and network policy definitions
- Automated Response: Container quarantine and security team alerting
Advanced Performance Analysis
- Performance Analyzer: Python-based comprehensive metrics collection and analysis
- Anomaly Detection: Statistical methods for identifying performance issues
- Trend Analysis: Historical performance analysis with recommendations
Enterprise Patterns
- Security Automation: Policy-driven security enforcement and violation handling
- Performance Intelligence: AI-driven performance optimization recommendations
- Continuous Monitoring: Real-time security and performance assessment
Next Steps: Part 5 demonstrates complete production implementations combining all these advanced techniques into enterprise-ready security and optimization solutions.