Advanced Networking and Storage Techniques

This section explores sophisticated Docker networking and storage patterns including custom plugins, software-defined networking, and enterprise-grade storage solutions.

Custom Network Plugins

CNI Plugin Development

// cni-plugin/main.go
package main

import (
    "encoding/json"
    "fmt"
    "net"
    "os"
    
    "github.com/containernetworking/cni/pkg/skel"
    "github.com/containernetworking/cni/pkg/types"
    "github.com/containernetworking/cni/pkg/version"
)

type NetConf struct {
    types.NetConf
    Bridge   string `json:"bridge"`
    Subnet   string `json:"subnet"`
    Gateway  string `json:"gateway"`
    IPAM     struct {
        Type   string `json:"type"`
        Subnet string `json:"subnet"`
    } `json:"ipam"`
}

func cmdAdd(args *skel.CmdArgs) error {
    conf := NetConf{}
    if err := json.Unmarshal(args.StdinData, &conf); err != nil {
        return fmt.Errorf("failed to parse network configuration: %v", err)
    }
    
    // Create bridge if it doesn't exist
    if err := createBridge(conf.Bridge); err != nil {
        return fmt.Errorf("failed to create bridge: %v", err)
    }
    
    // Allocate IP address
    ip, err := allocateIP(conf.Subnet)
    if err != nil {
        return fmt.Errorf("failed to allocate IP: %v", err)
    }
    
    // Create veth pair
    hostVeth, containerVeth, err := createVethPair(args.ContainerID)
    if err != nil {
        return fmt.Errorf("failed to create veth pair: %v", err)
    }
    
    // Attach host veth to bridge
    if err := attachToBridge(hostVeth, conf.Bridge); err != nil {
        return fmt.Errorf("failed to attach to bridge: %v", err)
    }
    
    // Move container veth to container namespace
    if err := moveToNamespace(containerVeth, args.Netns); err != nil {
        return fmt.Errorf("failed to move to namespace: %v", err)
    }
    
    // Configure container interface
    if err := configureInterface(containerVeth, ip, conf.Gateway, args.Netns); err != nil {
        return fmt.Errorf("failed to configure interface: %v", err)
    }
    
    // Return result
    result := &types.Result{
        IP4: &types.IPConfig{
            IP:      net.IPNet{IP: ip, Mask: net.CIDRMask(24, 32)},
            Gateway: net.ParseIP(conf.Gateway),
        },
    }
    
    return result.Print()
}

func cmdDel(args *skel.CmdArgs) error {
    // Cleanup network resources
    return deleteVethPair(args.ContainerID)
}

func main() {
    skel.PluginMain(cmdAdd, cmdDel, version.All)
}

// Helper functions
func createBridge(name string) error {
    // Implementation for bridge creation
    return nil
}

func allocateIP(subnet string) (net.IP, error) {
    // Implementation for IP allocation
    return net.ParseIP("192.168.1.100"), nil
}

func createVethPair(containerID string) (string, string, error) {
    // Implementation for veth pair creation
    hostVeth := fmt.Sprintf("veth%s", containerID[:8])
    containerVeth := fmt.Sprintf("eth%s", containerID[:8])
    return hostVeth, containerVeth, nil
}

func attachToBridge(veth, bridge string) error {
    // Implementation for bridge attachment
    return nil
}

func moveToNamespace(veth, netns string) error {
    // Implementation for namespace movement
    return nil
}

func configureInterface(iface string, ip net.IP, gateway string, netns string) error {
    // Implementation for interface configuration
    return nil
}

func deleteVethPair(containerID string) error {
    // Implementation for cleanup
    return nil
}

Docker Network Plugin

// docker-plugin/main.go
package main

import (
    "encoding/json"
    "fmt"
    "net/http"
    
    "github.com/docker/go-plugins-helpers/network"
)

type CustomDriver struct {
    networks map[string]*NetworkState
}

type NetworkState struct {
    ID       string
    Name     string
    Subnet   string
    Gateway  string
    Bridge   string
}

func (d *CustomDriver) GetCapabilities() (*network.CapabilitiesResponse, error) {
    return &network.CapabilitiesResponse{
        Scope:             network.LocalScope,
        ConnectivityScope: network.LocalScope,
    }, nil
}

func (d *CustomDriver) CreateNetwork(req *network.CreateNetworkRequest) error {
    // Parse network options
    subnet := req.Options["subnet"]
    gateway := req.Options["gateway"]
    bridge := fmt.Sprintf("br-%s", req.NetworkID[:12])
    
    // Create network state
    d.networks[req.NetworkID] = &NetworkState{
        ID:      req.NetworkID,
        Name:    req.NetworkID,
        Subnet:  subnet,
        Gateway: gateway,
        Bridge:  bridge,
    }
    
    // Create actual network infrastructure
    return d.createNetworkInfrastructure(d.networks[req.NetworkID])
}

func (d *CustomDriver) DeleteNetwork(req *network.DeleteNetworkRequest) error {
    state, exists := d.networks[req.NetworkID]
    if !exists {
        return fmt.Errorf("network %s not found", req.NetworkID)
    }
    
    // Cleanup network infrastructure
    if err := d.deleteNetworkInfrastructure(state); err != nil {
        return err
    }
    
    delete(d.networks, req.NetworkID)
    return nil
}

func (d *CustomDriver) CreateEndpoint(req *network.CreateEndpointRequest) (*network.CreateEndpointResponse, error) {
    state := d.networks[req.NetworkID]
    
    // Create endpoint (veth pair, IP allocation, etc.)
    ip, err := d.allocateIP(state.Subnet)
    if err != nil {
        return nil, err
    }
    
    return &network.CreateEndpointResponse{
        Interface: &network.EndpointInterface{
            Address:    ip,
            MacAddress: generateMAC(),
        },
    }, nil
}

func (d *CustomDriver) Join(req *network.JoinRequest) (*network.JoinResponse, error) {
    state := d.networks[req.NetworkID]
    
    // Create veth pair and configure
    hostVeth, containerVeth, err := d.createVethPair(req.EndpointID)
    if err != nil {
        return nil, err
    }
    
    // Attach to bridge
    if err := d.attachToBridge(hostVeth, state.Bridge); err != nil {
        return nil, err
    }
    
    return &network.JoinResponse{
        InterfaceName: network.InterfaceName{
            SrcName:   containerVeth,
            DstPrefix: "eth",
        },
        Gateway: state.Gateway,
    }, nil
}

func (d *CustomDriver) Leave(req *network.LeaveRequest) error {
    // Cleanup endpoint resources
    return d.deleteVethPair(req.EndpointID)
}

func (d *CustomDriver) DeleteEndpoint(req *network.DeleteEndpointRequest) error {
    // Cleanup endpoint state
    return nil
}

func main() {
    driver := &CustomDriver{
        networks: make(map[string]*NetworkState),
    }
    
    handler := network.NewHandler(driver)
    http.ListenAndServe(":8080", handler)
}

// Helper methods
func (d *CustomDriver) createNetworkInfrastructure(state *NetworkState) error {
    // Implementation for creating bridges, iptables rules, etc.
    return nil
}

func (d *CustomDriver) deleteNetworkInfrastructure(state *NetworkState) error {
    // Implementation for cleanup
    return nil
}

func (d *CustomDriver) allocateIP(subnet string) (string, error) {
    // Implementation for IP allocation
    return "192.168.1.100/24", nil
}

func (d *CustomDriver) createVethPair(endpointID string) (string, string, error) {
    // Implementation for veth pair creation
    return "veth" + endpointID[:8], "eth" + endpointID[:8], nil
}

func (d *CustomDriver) attachToBridge(veth, bridge string) error {
    // Implementation for bridge attachment
    return nil
}

func (d *CustomDriver) deleteVethPair(endpointID string) error {
    // Implementation for cleanup
    return nil
}

func generateMAC() string {
    // Implementation for MAC address generation
    return "02:42:ac:11:00:02"
}

Software-Defined Networking

OpenVSwitch Integration

# docker-compose.ovs.yml
version: '3.8'

services:
  # OpenVSwitch Database
  ovs-db:
    image: openvswitch/ovs:latest
    privileged: true
    networks:
      - ovs-control
    volumes:
      - ovs-db-data:/var/lib/openvswitch
      - /sys/fs/cgroup:/sys/fs/cgroup:ro
    command: |
      sh -c "
        ovsdb-server --remote=punix:/var/run/openvswitch/db.sock \
                     --remote=db:Open_vSwitch,Open_vSwitch,manager_options \
                     --pidfile --detach
        ovs-vsctl --no-wait init
        tail -f /dev/null
      "

  # OpenVSwitch Daemon
  ovs-vswitchd:
    image: openvswitch/ovs:latest
    privileged: true
    networks:
      - ovs-control
    volumes:
      - ovs-db-data:/var/lib/openvswitch
      - /sys/fs/cgroup:/sys/fs/cgroup:ro
    depends_on:
      - ovs-db
    command: |
      sh -c "
        ovs-vswitchd --pidfile --detach
        tail -f /dev/null
      "

  # SDN Controller (Floodlight)
  sdn-controller:
    image: floodlight/floodlight:latest
    ports:
      - "8080:8080"
      - "6653:6653"
    networks:
      - ovs-control
    volumes:
      - ./floodlight/floodlightdefault.properties:/opt/floodlight/src/main/resources/floodlightdefault.properties:ro

  # Application containers using OVS
  app1:
    image: alpine
    networks:
      - ovs-network
    command: sleep 3600

  app2:
    image: alpine
    networks:
      - ovs-network
    command: sleep 3600

networks:
  ovs-control:
    driver: bridge
  ovs-network:
    driver: ovs
    driver_opts:
      ovs.bridge.name: br-ovs
      ovs.bridge.controller: tcp:sdn-controller:6653

volumes:
  ovs-db-data:

Network Function Virtualization

# docker-compose.nfv.yml
version: '3.8'

services:
  # Virtual Firewall
  virtual-firewall:
    build: ./nfv/firewall
    privileged: true
    networks:
      - nfv-mgmt
      - nfv-data
    volumes:
      - ./firewall/rules.conf:/etc/firewall/rules.conf:ro
    environment:
      - INTERFACES=eth0,eth1
      - RULES_FILE=/etc/firewall/rules.conf

  # Virtual Load Balancer
  virtual-lb:
    build: ./nfv/loadbalancer
    networks:
      - nfv-mgmt
      - nfv-data
    volumes:
      - ./lb/haproxy.cfg:/etc/haproxy/haproxy.cfg:ro
    ports:
      - "80:80"
      - "443:443"

  # Virtual Router
  virtual-router:
    build: ./nfv/router
    privileged: true
    networks:
      - nfv-mgmt
      - nfv-data
      - external
    volumes:
      - ./router/quagga.conf:/etc/quagga/quagga.conf:ro
    sysctls:
      - net.ipv4.ip_forward=1

  # Network Monitoring
  network-monitor:
    image: prom/prometheus
    networks:
      - nfv-mgmt
    volumes:
      - ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml:ro
      - prometheus-data:/prometheus

  # Service Orchestrator
  nfv-orchestrator:
    build: ./nfv/orchestrator
    networks:
      - nfv-mgmt
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
      - ./orchestrator/config.yaml:/etc/orchestrator/config.yaml:ro
    environment:
      - DOCKER_HOST=unix:///var/run/docker.sock

networks:
  nfv-mgmt:
    driver: bridge
    ipam:
      config:
        - subnet: 10.0.1.0/24
  nfv-data:
    driver: bridge
    ipam:
      config:
        - subnet: 10.0.2.0/24
  external:
    driver: bridge

volumes:
  prometheus-data:

Advanced Storage Drivers

Custom Storage Plugin

// storage-plugin/main.go
package main

import (
    "encoding/json"
    "fmt"
    "net/http"
    "os"
    "path/filepath"
    
    "github.com/docker/go-plugins-helpers/volume"
)

type CustomVolumeDriver struct {
    volumes map[string]*VolumeState
    root    string
}

type VolumeState struct {
    Name       string
    Path       string
    Options    map[string]string
    Mountpoint string
}

func (d *CustomVolumeDriver) Create(req *volume.CreateRequest) error {
    // Parse options
    volumeType := req.Options["type"]
    size := req.Options["size"]
    encryption := req.Options["encryption"]
    
    // Create volume directory
    volumePath := filepath.Join(d.root, req.Name)
    if err := os.MkdirAll(volumePath, 0755); err != nil {
        return err
    }
    
    // Apply volume-specific configuration
    switch volumeType {
    case "encrypted":
        if err := d.setupEncryption(volumePath, encryption); err != nil {
            return err
        }
    case "compressed":
        if err := d.setupCompression(volumePath); err != nil {
            return err
        }
    case "replicated":
        if err := d.setupReplication(volumePath, req.Options); err != nil {
            return err
        }
    }
    
    // Store volume state
    d.volumes[req.Name] = &VolumeState{
        Name:    req.Name,
        Path:    volumePath,
        Options: req.Options,
    }
    
    return nil
}

func (d *CustomVolumeDriver) Remove(req *volume.RemoveRequest) error {
    state, exists := d.volumes[req.Name]
    if !exists {
        return fmt.Errorf("volume %s not found", req.Name)
    }
    
    // Cleanup volume resources
    if err := d.cleanupVolume(state); err != nil {
        return err
    }
    
    // Remove directory
    if err := os.RemoveAll(state.Path); err != nil {
        return err
    }
    
    delete(d.volumes, req.Name)
    return nil
}

func (d *CustomVolumeDriver) Mount(req *volume.MountRequest) (*volume.MountResponse, error) {
    state, exists := d.volumes[req.Name]
    if !exists {
        return nil, fmt.Errorf("volume %s not found", req.Name)
    }
    
    // Prepare mount point
    mountpoint := filepath.Join("/mnt", req.Name)
    if err := os.MkdirAll(mountpoint, 0755); err != nil {
        return nil, err
    }
    
    // Mount based on volume type
    if err := d.mountVolume(state, mountpoint); err != nil {
        return nil, err
    }
    
    state.Mountpoint = mountpoint
    return &volume.MountResponse{Mountpoint: mountpoint}, nil
}

func (d *CustomVolumeDriver) Unmount(req *volume.UnmountRequest) error {
    state, exists := d.volumes[req.Name]
    if !exists {
        return fmt.Errorf("volume %s not found", req.Name)
    }
    
    // Unmount volume
    if err := d.unmountVolume(state); err != nil {
        return err
    }
    
    state.Mountpoint = ""
    return nil
}

func (d *CustomVolumeDriver) Path(req *volume.PathRequest) (*volume.PathResponse, error) {
    state, exists := d.volumes[req.Name]
    if !exists {
        return nil, fmt.Errorf("volume %s not found", req.Name)
    }
    
    return &volume.PathResponse{Mountpoint: state.Mountpoint}, nil
}

func (d *CustomVolumeDriver) Get(req *volume.GetRequest) (*volume.GetResponse, error) {
    state, exists := d.volumes[req.Name]
    if !exists {
        return nil, fmt.Errorf("volume %s not found", req.Name)
    }
    
    return &volume.GetResponse{
        Volume: &volume.Volume{
            Name:       state.Name,
            Mountpoint: state.Mountpoint,
        },
    }, nil
}

func (d *CustomVolumeDriver) List() (*volume.ListResponse, error) {
    var volumes []*volume.Volume
    
    for _, state := range d.volumes {
        volumes = append(volumes, &volume.Volume{
            Name:       state.Name,
            Mountpoint: state.Mountpoint,
        })
    }
    
    return &volume.ListResponse{Volumes: volumes}, nil
}

func (d *CustomVolumeDriver) Capabilities() *volume.CapabilitiesResponse {
    return &volume.CapabilitiesResponse{
        Capabilities: volume.Capability{Scope: "local"},
    }
}

// Helper methods
func (d *CustomVolumeDriver) setupEncryption(path, algorithm string) error {
    // Implementation for encryption setup
    return nil
}

func (d *CustomVolumeDriver) setupCompression(path string) error {
    // Implementation for compression setup
    return nil
}

func (d *CustomVolumeDriver) setupReplication(path string, options map[string]string) error {
    // Implementation for replication setup
    return nil
}

func (d *CustomVolumeDriver) cleanupVolume(state *VolumeState) error {
    // Implementation for volume cleanup
    return nil
}

func (d *CustomVolumeDriver) mountVolume(state *VolumeState, mountpoint string) error {
    // Implementation for volume mounting
    return nil
}

func (d *CustomVolumeDriver) unmountVolume(state *VolumeState) error {
    // Implementation for volume unmounting
    return nil
}

func main() {
    driver := &CustomVolumeDriver{
        volumes: make(map[string]*VolumeState),
        root:    "/var/lib/custom-volumes",
    }
    
    handler := volume.NewHandler(driver)
    http.ListenAndServe(":8080", handler)
}

Container Storage Interface (CSI)

CSI Driver Implementation

// csi-driver/main.go
package main

import (
    "context"
    "fmt"
    "net"
    
    "github.com/container-storage-interface/spec/lib/go/csi"
    "google.golang.org/grpc"
)

type CSIDriver struct {
    name    string
    version string
    nodeID  string
}

// Identity Service
func (d *CSIDriver) GetPluginInfo(ctx context.Context, req *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error) {
    return &csi.GetPluginInfoResponse{
        Name:          d.name,
        VendorVersion: d.version,
    }, nil
}

func (d *CSIDriver) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) {
    return &csi.GetPluginCapabilitiesResponse{
        Capabilities: []*csi.PluginCapability{
            {
                Type: &csi.PluginCapability_Service_{
                    Service: &csi.PluginCapability_Service{
                        Type: csi.PluginCapability_Service_CONTROLLER_SERVICE,
                    },
                },
            },
        },
    }, nil
}

func (d *CSIDriver) Probe(ctx context.Context, req *csi.ProbeRequest) (*csi.ProbeResponse, error) {
    return &csi.ProbeResponse{}, nil
}

// Controller Service
func (d *CSIDriver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
    volumeID := generateVolumeID()
    
    // Create volume based on parameters
    parameters := req.GetParameters()
    volumeType := parameters["type"]
    size := req.GetCapacityRange().GetRequiredBytes()
    
    volume, err := d.createVolumeBackend(volumeID, volumeType, size, parameters)
    if err != nil {
        return nil, err
    }
    
    return &csi.CreateVolumeResponse{
        Volume: &csi.Volume{
            VolumeId:      volumeID,
            CapacityBytes: size,
            VolumeContext: volume.Context,
        },
    }, nil
}

func (d *CSIDriver) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
    volumeID := req.GetVolumeId()
    
    if err := d.deleteVolumeBackend(volumeID); err != nil {
        return nil, err
    }
    
    return &csi.DeleteVolumeResponse{}, nil
}

func (d *CSIDriver) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
    volumeID := req.GetVolumeId()
    nodeID := req.GetNodeId()
    
    publishContext, err := d.attachVolumeToNode(volumeID, nodeID)
    if err != nil {
        return nil, err
    }
    
    return &csi.ControllerPublishVolumeResponse{
        PublishContext: publishContext,
    }, nil
}

// Node Service
func (d *CSIDriver) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
    volumeID := req.GetVolumeId()
    stagingPath := req.GetStagingTargetPath()
    
    if err := d.stageVolume(volumeID, stagingPath, req.GetPublishContext()); err != nil {
        return nil, err
    }
    
    return &csi.NodeStageVolumeResponse{}, nil
}

func (d *CSIDriver) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
    volumeID := req.GetVolumeId()
    targetPath := req.GetTargetPath()
    stagingPath := req.GetStagingTargetPath()
    
    if err := d.publishVolume(volumeID, stagingPath, targetPath); err != nil {
        return nil, err
    }
    
    return &csi.NodePublishVolumeResponse{}, nil
}

func (d *CSIDriver) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
    return &csi.NodeGetCapabilitiesResponse{
        Capabilities: []*csi.NodeServiceCapability{
            {
                Type: &csi.NodeServiceCapability_Rpc{
                    Rpc: &csi.NodeServiceCapability_RPC{
                        Type: csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME,
                    },
                },
            },
        },
    }, nil
}

func (d *CSIDriver) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
    return &csi.NodeGetInfoResponse{
        NodeId: d.nodeID,
    }, nil
}

// Helper methods
func (d *CSIDriver) createVolumeBackend(volumeID, volumeType string, size int64, parameters map[string]string) (*VolumeInfo, error) {
    // Implementation for volume creation
    return &VolumeInfo{
        ID:      volumeID,
        Context: map[string]string{"type": volumeType},
    }, nil
}

func (d *CSIDriver) deleteVolumeBackend(volumeID string) error {
    // Implementation for volume deletion
    return nil
}

func (d *CSIDriver) attachVolumeToNode(volumeID, nodeID string) (map[string]string, error) {
    // Implementation for volume attachment
    return map[string]string{"device": "/dev/sdb"}, nil
}

func (d *CSIDriver) stageVolume(volumeID, stagingPath string, publishContext map[string]string) error {
    // Implementation for volume staging
    return nil
}

func (d *CSIDriver) publishVolume(volumeID, stagingPath, targetPath string) error {
    // Implementation for volume publishing
    return nil
}

type VolumeInfo struct {
    ID      string
    Context map[string]string
}

func generateVolumeID() string {
    return fmt.Sprintf("vol-%d", time.Now().Unix())
}

func main() {
    driver := &CSIDriver{
        name:    "custom.csi.driver",
        version: "1.0.0",
        nodeID:  "node-1",
    }
    
    listener, err := net.Listen("unix", "/tmp/csi.sock")
    if err != nil {
        panic(err)
    }
    
    server := grpc.NewServer()
    csi.RegisterIdentityServer(server, driver)
    csi.RegisterControllerServer(server, driver)
    csi.RegisterNodeServer(server, driver)
    
    server.Serve(listener)
}

Summary

This section covered advanced networking and storage techniques:

Custom Network Solutions

  • CNI Plugins: Container Network Interface plugin development
  • Docker Network Plugins: Custom network drivers for Docker
  • Software-Defined Networking: OpenVSwitch and SDN controller integration
  • Network Function Virtualization: Virtual firewalls, load balancers, and routers

Advanced Storage Systems

  • Custom Volume Drivers: Docker volume plugin development with encryption and replication
  • CSI Implementation: Container Storage Interface driver for Kubernetes integration
  • Storage Orchestration: Automated storage provisioning and management

Enterprise Patterns

  • Plugin Architecture: Extensible networking and storage solutions
  • Service Orchestration: Automated network function deployment
  • Performance Optimization: Advanced tuning and monitoring capabilities
  • Security Integration: Encryption, access control, and compliance features

Next Steps: Part 5 demonstrates complete production implementations combining all these advanced techniques into enterprise-ready networking and storage solutions.