Integration with Go Microservices

Now let’s explore how to integrate circuit breakers with common Go microservice patterns.

HTTP Client Integration

Integrating circuit breakers with HTTP clients is a common use case:

package resilience

import (
    "context"
    "net/http"
    "time"
    
    "example.com/circuitbreaker"
)

// ResilienceClient wraps an HTTP client with resilience patterns
type ResilienceClient struct {
    client        *http.Client
    circuitBreaker circuitbreaker.CircuitBreaker
    bulkhead      *bulkhead.Bulkhead
    timeout       time.Duration
}

// NewResilienceClient creates a new resilient HTTP client
func NewResilienceClient(
    client *http.Client,
    cb circuitbreaker.CircuitBreaker,
    bh *bulkhead.Bulkhead,
    timeout time.Duration,
) *ResilienceClient {
    return &ResilienceClient{
        client:        client,
        circuitBreaker: cb,
        bulkhead:      bh,
        timeout:       timeout,
    }
}

// Do performs an HTTP request with resilience patterns
func (c *ResilienceClient) Do(req *http.Request) (*http.Response, error) {
    // Apply timeout
    ctx, cancel := context.WithTimeout(req.Context(), c.timeout)
    defer cancel()
    req = req.WithContext(ctx)
    
    // Check circuit breaker
    if !c.circuitBreaker.AllowRequest() {
        return nil, circuitbreaker.ErrCircuitOpen
    }
    
    // Apply bulkhead
    if !c.bulkhead.Acquire() {
        return nil, bulkhead.ErrBulkheadFull
    }
    defer c.bulkhead.Release()
    
    // Execute request
    var resp *http.Response
    var err error
    
    err = c.circuitBreaker.Execute(func() error {
        resp, err = c.client.Do(req)
        if err != nil {
            return err
        }
        
        // Consider certain status codes as failures
        if resp.StatusCode >= 500 {
            return &httpError{statusCode: resp.StatusCode}
        }
        
        return nil
    })
    
    return resp, err
}

// httpError represents an HTTP error with status code
type httpError struct {
    statusCode int
}

func (e *httpError) Error() string {
    return fmt.Sprintf("HTTP error: %d", e.statusCode)
}

gRPC Client Integration

Similarly, we can integrate circuit breakers with gRPC clients:

package resilience

import (
    "context"
    "time"
    "strings"
    "sync"
    
    "example.com/circuitbreaker"
    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
)

// ResilienceInterceptor creates a gRPC client interceptor with resilience patterns
func ResilienceInterceptor(
    cb circuitbreaker.CircuitBreaker,
    bh *bulkhead.Bulkhead,
    timeout time.Duration,
) grpc.UnaryClientInterceptor {
    return func(
        ctx context.Context,
        method string,
        req, reply interface{},
        cc *grpc.ClientConn,
        invoker grpc.UnaryInvoker,
        opts ...grpc.CallOption,
    ) error {
        // Apply timeout
        timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
        defer cancel()
        
        // Check circuit breaker
        if !cb.AllowRequest() {
            return status.Error(codes.Unavailable, "circuit breaker is open")
        }
        
        // Apply bulkhead
        if !bh.Acquire() {
            return status.Error(codes.ResourceExhausted, "bulkhead is full")
        }
        defer bh.Release()
        
        // Execute request with circuit breaker
        var err error
        err = cb.Execute(func() error {
            return invoker(timeoutCtx, method, req, reply, cc, opts...)
        })
        
        // Record result in circuit breaker based on error type
        if err != nil {
            // Only certain error types should trip the circuit breaker
            s, ok := status.FromError(err)
            if ok {
                // Only server errors should trip the circuit breaker
                switch s.Code() {
                case codes.Unavailable, codes.DeadlineExceeded, codes.ResourceExhausted, codes.Internal, codes.Unknown:
                    cb.RecordResult(false)
                default:
                    // Client errors should not trip the circuit breaker
                    cb.RecordResult(true)
                }
            } else {
                // Non-status errors (like context cancellation) should trip the circuit breaker
                cb.RecordResult(false)
            }
            return err
        }
        
        // Record success
        cb.RecordResult(true)
        return nil
    }
}

// ServiceMethod represents a gRPC service method
type ServiceMethod struct {
    Service string
    Method  string
}

// PerServiceCircuitBreaker manages circuit breakers per gRPC service
type PerServiceCircuitBreaker struct {
    breakers map[ServiceMethod]circuitbreaker.CircuitBreaker
    factory  func() circuitbreaker.CircuitBreaker
    mutex    sync.RWMutex
}

// NewPerServiceCircuitBreaker creates a new per-service circuit breaker
func NewPerServiceCircuitBreaker(factory func() circuitbreaker.CircuitBreaker) *PerServiceCircuitBreaker {
    return &PerServiceCircuitBreaker{
        breakers: make(map[ServiceMethod]circuitbreaker.CircuitBreaker),
        factory:  factory,
    }
}

// UnaryInterceptor creates a gRPC interceptor that uses per-service circuit breakers
func (p *PerServiceCircuitBreaker) UnaryInterceptor() grpc.UnaryClientInterceptor {
    return func(
        ctx context.Context,
        method string,
        req, reply interface{},
        cc *grpc.ClientConn,
        invoker grpc.UnaryInvoker,
        opts ...grpc.CallOption,
    ) error {
        // Parse service and method from full method name
        parts := strings.Split(method, "/")
        if len(parts) != 3 {
            return invoker(ctx, method, req, reply, cc, opts...)
        }
        
        serviceMethod := ServiceMethod{
            Service: parts[1],
            Method:  parts[2],
        }
        
        // Get or create circuit breaker for this service method
        p.mutex.RLock()
        cb, exists := p.breakers[serviceMethod]
        p.mutex.RUnlock()
        
        if !exists {
            p.mutex.Lock()
            cb, exists = p.breakers[serviceMethod]
            if !exists {
                cb = p.factory()
                p.breakers[serviceMethod] = cb
            }
            p.mutex.Unlock()
        }
        
        // Check circuit breaker
        if !cb.AllowRequest() {
            return status.Error(codes.Unavailable, "circuit breaker is open")
        }
        
        // Execute request with circuit breaker
        err := cb.Execute(func() error {
            return invoker(ctx, method, req, reply, cc, opts...)
        })
        
        return err
    }
}

Database Connection Circuit Breaker

Circuit breakers are also valuable for database connections:

package database

import (
    "context"
    "database/sql"
    "time"
    
    "example.com/circuitbreaker"
)

// ResilienceDB wraps a database with resilience patterns
type ResilienceDB struct {
    db            *sql.DB
    circuitBreaker circuitbreaker.CircuitBreaker
    timeout       time.Duration
}

// NewResilienceDB creates a new resilient database wrapper
func NewResilienceDB(
    db *sql.DB,
    cb circuitbreaker.CircuitBreaker,
    timeout time.Duration,
) *ResilienceDB {
    return &ResilienceDB{
        db:            db,
        circuitBreaker: cb,
        timeout:       timeout,
    }
}

// Query executes a query with resilience patterns
func (r *ResilienceDB) Query(query string, args ...interface{}) (*sql.Rows, error) {
    // Check circuit breaker
    if !r.circuitBreaker.AllowRequest() {
        return nil, circuitbreaker.ErrCircuitOpen
    }
    
    // Create context with timeout
    ctx, cancel := context.WithTimeout(context.Background(), r.timeout)
    defer cancel()
    
    // Execute query with circuit breaker
    var rows *sql.Rows
    var err error
    
    err = r.circuitBreaker.Execute(func() error {
        rows, err = r.db.QueryContext(ctx, query, args...)
        return err
    })
    
    return rows, err
}

// Exec executes a statement with resilience patterns
func (r *ResilienceDB) Exec(query string, args ...interface{}) (sql.Result, error) {
    // Check circuit breaker
    if !r.circuitBreaker.AllowRequest() {
        return nil, circuitbreaker.ErrCircuitOpen
    }
    
    // Create context with timeout
    ctx, cancel := context.WithTimeout(context.Background(), r.timeout)
    defer cancel()
    
    // Execute statement with circuit breaker
    var result sql.Result
    var err error
    
    err = r.circuitBreaker.Execute(func() error {
        result, err = r.db.ExecContext(ctx, query, args...)
        return err
    })
    
    return result, err
}

// Transaction executes a transaction with resilience patterns
func (r *ResilienceDB) Transaction(fn func(*sql.Tx) error) error {
    // Check circuit breaker
    if !r.circuitBreaker.AllowRequest() {
        return circuitbreaker.ErrCircuitOpen
    }
    
    // Create context with timeout
    ctx, cancel := context.WithTimeout(context.Background(), r.timeout)
    defer cancel()
    
    // Execute transaction with circuit breaker
    return r.circuitBreaker.Execute(func() error {
        tx, err := r.db.BeginTx(ctx, nil)
        if err != nil {
            return err
        }
        
        // If fn returns an error, the transaction will be rolled back
        if err := fn(tx); err != nil {
            tx.Rollback()
            return err
        }
        
        // Commit the transaction
        return tx.Commit()
    })
}