Integration with Go Microservices
Now let’s explore how to integrate circuit breakers with common Go microservice patterns.
HTTP Client Integration
Integrating circuit breakers with HTTP clients is a common use case:
package resilience
import (
"context"
"net/http"
"time"
"example.com/circuitbreaker"
)
// ResilienceClient wraps an HTTP client with resilience patterns
type ResilienceClient struct {
client *http.Client
circuitBreaker circuitbreaker.CircuitBreaker
bulkhead *bulkhead.Bulkhead
timeout time.Duration
}
// NewResilienceClient creates a new resilient HTTP client
func NewResilienceClient(
client *http.Client,
cb circuitbreaker.CircuitBreaker,
bh *bulkhead.Bulkhead,
timeout time.Duration,
) *ResilienceClient {
return &ResilienceClient{
client: client,
circuitBreaker: cb,
bulkhead: bh,
timeout: timeout,
}
}
// Do performs an HTTP request with resilience patterns
func (c *ResilienceClient) Do(req *http.Request) (*http.Response, error) {
// Apply timeout
ctx, cancel := context.WithTimeout(req.Context(), c.timeout)
defer cancel()
req = req.WithContext(ctx)
// Check circuit breaker
if !c.circuitBreaker.AllowRequest() {
return nil, circuitbreaker.ErrCircuitOpen
}
// Apply bulkhead
if !c.bulkhead.Acquire() {
return nil, bulkhead.ErrBulkheadFull
}
defer c.bulkhead.Release()
// Execute request
var resp *http.Response
var err error
err = c.circuitBreaker.Execute(func() error {
resp, err = c.client.Do(req)
if err != nil {
return err
}
// Consider certain status codes as failures
if resp.StatusCode >= 500 {
return &httpError{statusCode: resp.StatusCode}
}
return nil
})
return resp, err
}
// httpError represents an HTTP error with status code
type httpError struct {
statusCode int
}
func (e *httpError) Error() string {
return fmt.Sprintf("HTTP error: %d", e.statusCode)
}
gRPC Client Integration
Similarly, we can integrate circuit breakers with gRPC clients:
package resilience
import (
"context"
"time"
"strings"
"sync"
"example.com/circuitbreaker"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// ResilienceInterceptor creates a gRPC client interceptor with resilience patterns
func ResilienceInterceptor(
cb circuitbreaker.CircuitBreaker,
bh *bulkhead.Bulkhead,
timeout time.Duration,
) grpc.UnaryClientInterceptor {
return func(
ctx context.Context,
method string,
req, reply interface{},
cc *grpc.ClientConn,
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption,
) error {
// Apply timeout
timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
// Check circuit breaker
if !cb.AllowRequest() {
return status.Error(codes.Unavailable, "circuit breaker is open")
}
// Apply bulkhead
if !bh.Acquire() {
return status.Error(codes.ResourceExhausted, "bulkhead is full")
}
defer bh.Release()
// Execute request with circuit breaker
var err error
err = cb.Execute(func() error {
return invoker(timeoutCtx, method, req, reply, cc, opts...)
})
// Record result in circuit breaker based on error type
if err != nil {
// Only certain error types should trip the circuit breaker
s, ok := status.FromError(err)
if ok {
// Only server errors should trip the circuit breaker
switch s.Code() {
case codes.Unavailable, codes.DeadlineExceeded, codes.ResourceExhausted, codes.Internal, codes.Unknown:
cb.RecordResult(false)
default:
// Client errors should not trip the circuit breaker
cb.RecordResult(true)
}
} else {
// Non-status errors (like context cancellation) should trip the circuit breaker
cb.RecordResult(false)
}
return err
}
// Record success
cb.RecordResult(true)
return nil
}
}
// ServiceMethod represents a gRPC service method
type ServiceMethod struct {
Service string
Method string
}
// PerServiceCircuitBreaker manages circuit breakers per gRPC service
type PerServiceCircuitBreaker struct {
breakers map[ServiceMethod]circuitbreaker.CircuitBreaker
factory func() circuitbreaker.CircuitBreaker
mutex sync.RWMutex
}
// NewPerServiceCircuitBreaker creates a new per-service circuit breaker
func NewPerServiceCircuitBreaker(factory func() circuitbreaker.CircuitBreaker) *PerServiceCircuitBreaker {
return &PerServiceCircuitBreaker{
breakers: make(map[ServiceMethod]circuitbreaker.CircuitBreaker),
factory: factory,
}
}
// UnaryInterceptor creates a gRPC interceptor that uses per-service circuit breakers
func (p *PerServiceCircuitBreaker) UnaryInterceptor() grpc.UnaryClientInterceptor {
return func(
ctx context.Context,
method string,
req, reply interface{},
cc *grpc.ClientConn,
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption,
) error {
// Parse service and method from full method name
parts := strings.Split(method, "/")
if len(parts) != 3 {
return invoker(ctx, method, req, reply, cc, opts...)
}
serviceMethod := ServiceMethod{
Service: parts[1],
Method: parts[2],
}
// Get or create circuit breaker for this service method
p.mutex.RLock()
cb, exists := p.breakers[serviceMethod]
p.mutex.RUnlock()
if !exists {
p.mutex.Lock()
cb, exists = p.breakers[serviceMethod]
if !exists {
cb = p.factory()
p.breakers[serviceMethod] = cb
}
p.mutex.Unlock()
}
// Check circuit breaker
if !cb.AllowRequest() {
return status.Error(codes.Unavailable, "circuit breaker is open")
}
// Execute request with circuit breaker
err := cb.Execute(func() error {
return invoker(ctx, method, req, reply, cc, opts...)
})
return err
}
}
Database Connection Circuit Breaker
Circuit breakers are also valuable for database connections:
package database
import (
"context"
"database/sql"
"time"
"example.com/circuitbreaker"
)
// ResilienceDB wraps a database with resilience patterns
type ResilienceDB struct {
db *sql.DB
circuitBreaker circuitbreaker.CircuitBreaker
timeout time.Duration
}
// NewResilienceDB creates a new resilient database wrapper
func NewResilienceDB(
db *sql.DB,
cb circuitbreaker.CircuitBreaker,
timeout time.Duration,
) *ResilienceDB {
return &ResilienceDB{
db: db,
circuitBreaker: cb,
timeout: timeout,
}
}
// Query executes a query with resilience patterns
func (r *ResilienceDB) Query(query string, args ...interface{}) (*sql.Rows, error) {
// Check circuit breaker
if !r.circuitBreaker.AllowRequest() {
return nil, circuitbreaker.ErrCircuitOpen
}
// Create context with timeout
ctx, cancel := context.WithTimeout(context.Background(), r.timeout)
defer cancel()
// Execute query with circuit breaker
var rows *sql.Rows
var err error
err = r.circuitBreaker.Execute(func() error {
rows, err = r.db.QueryContext(ctx, query, args...)
return err
})
return rows, err
}
// Exec executes a statement with resilience patterns
func (r *ResilienceDB) Exec(query string, args ...interface{}) (sql.Result, error) {
// Check circuit breaker
if !r.circuitBreaker.AllowRequest() {
return nil, circuitbreaker.ErrCircuitOpen
}
// Create context with timeout
ctx, cancel := context.WithTimeout(context.Background(), r.timeout)
defer cancel()
// Execute statement with circuit breaker
var result sql.Result
var err error
err = r.circuitBreaker.Execute(func() error {
result, err = r.db.ExecContext(ctx, query, args...)
return err
})
return result, err
}
// Transaction executes a transaction with resilience patterns
func (r *ResilienceDB) Transaction(fn func(*sql.Tx) error) error {
// Check circuit breaker
if !r.circuitBreaker.AllowRequest() {
return circuitbreaker.ErrCircuitOpen
}
// Create context with timeout
ctx, cancel := context.WithTimeout(context.Background(), r.timeout)
defer cancel()
// Execute transaction with circuit breaker
return r.circuitBreaker.Execute(func() error {
tx, err := r.db.BeginTx(ctx, nil)
if err != nil {
return err
}
// If fn returns an error, the transaction will be rolled back
if err := fn(tx); err != nil {
tx.Rollback()
return err
}
// Commit the transaction
return tx.Commit()
})
}