Client-Side Optimization
Optimizing client configurations is equally important:
package main
import (
"context"
"log"
"time"
"github.com/example/service/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
)
func main() {
// Configure client options for performance
opts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
// Enable keepalive
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Second, // Send pings every 10 seconds if there is no activity
Timeout: time.Second, // Wait 1 second for ping ack before considering the connection dead
PermitWithoutStream: true, // Send pings even without active streams
}),
// Set initial window size (bytes)
grpc.WithInitialWindowSize(1 * 1024 * 1024), // 1MB
// Set initial connection window size (bytes)
grpc.WithInitialConnWindowSize(1 * 1024 * 1024), // 1MB
// Enable wait for ready semantics
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
}
// Connect to the server
conn, err := grpc.Dial("localhost:50051", opts...)
if err != nil {
log.Fatalf("failed to connect: %v", err)
}
defer conn.Close()
// Create client
client := proto.NewUserServiceClient(conn)
// Set timeout for the request
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Make the request
user, err := client.GetUser(ctx, &proto.GetUserRequest{
UserId: "user123",
})
if err != nil {
log.Fatalf("request failed: %v", err)
}
log.Printf("Response: %v", user)
}
Connection Management and Pooling
Effective connection management is crucial for maintaining high performance in production gRPC services.
Client Connection Pooling
Implementing a connection pool helps manage resources efficiently:
package grpcpool
import (
"context"
"errors"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
)
// Pool represents a pool of gRPC client connections
type Pool struct {
mu sync.Mutex
connections []*grpc.ClientConn
factory func() (*grpc.ClientConn, error)
closed bool
maxIdle int
maxOpen int
numOpen int
}
// NewPool creates a new connection pool
func NewPool(factory func() (*grpc.ClientConn, error), maxIdle, maxOpen int) *Pool {
return &Pool{
connections: make([]*grpc.ClientConn, 0, maxIdle),
factory: factory,
maxIdle: maxIdle,
maxOpen: maxOpen,
}
}
// Get returns a connection from the pool
func (p *Pool) Get(ctx context.Context) (*grpc.ClientConn, error) {
p.mu.Lock()
if p.closed {
p.mu.Unlock()
return nil, ErrClosed
}
// Check for available connection
if len(p.connections) > 0 {
conn := p.connections[len(p.connections)-1]
p.connections = p.connections[:len(p.connections)-1]
p.mu.Unlock()
// Check if connection is still valid
if conn.GetState() != connectivity.Shutdown {
return conn, nil
}
// Connection is not valid, close it and create a new one
conn.Close()
p.mu.Lock()
p.numOpen--
p.mu.Unlock()
} else {
// No connections available, check if we can create a new one
if p.numOpen >= p.maxOpen {
p.mu.Unlock()
// Wait for a connection to become available or context to be done
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(time.Second):
// Try again
return p.Get(ctx)
}
}
// Create a new connection
p.numOpen++
p.mu.Unlock()
}
// Create a new connection
conn, err := p.factory()
if err != nil {
p.mu.Lock()
p.numOpen--
p.mu.Unlock()
return nil, err
}
return conn, nil
}
// Put returns a connection to the pool
func (p *Pool) Put(conn *grpc.ClientConn) error {
p.mu.Lock()
defer p.mu.Unlock()
if p.closed {
return conn.Close()
}
// Check if connection is still valid
if conn.GetState() == connectivity.Shutdown {
p.numOpen--
return nil
}
// If we've reached max idle connections, close this one
if len(p.connections) >= p.maxIdle {
p.numOpen--
return conn.Close()
}
// Add connection back to the pool
p.connections = append(p.connections, conn)
return nil
}
// Close closes the pool and all its connections
func (p *Pool) Close() error {
p.mu.Lock()
defer p.mu.Unlock()
if p.closed {
return ErrClosed
}
p.closed = true
// Close all connections
for _, conn := range p.connections {
conn.Close()
}
p.connections = nil
p.numOpen = 0
return nil
}
// ErrClosed is returned when the pool is closed
var ErrClosed = errors.New("pool is closed")
Usage example:
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/example/grpcpool"
"github.com/example/service/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
func main() {
// Create a connection factory
factory := func() (*grpc.ClientConn, error) {
return grpc.Dial("localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
)
}
// Create a connection pool with max 5 idle connections and max 20 open connections
pool := grpcpool.NewPool(factory, 5, 20)
defer pool.Close()
// Use the pool for multiple requests
for i := 0; i < 100; i++ {
go func(id int) {
// Get a connection from the pool
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
conn, err := pool.Get(ctx)
if err != nil {
log.Printf("Failed to get connection: %v", err)
return
}
// Use the connection
client := proto.NewUserServiceClient(conn)
user, err := client.GetUser(ctx, &proto.GetUserRequest{
UserId: fmt.Sprintf("user%d", id),
})
if err != nil {
log.Printf("Request failed: %v", err)
} else {
log.Printf("Got user: %v", user.Name)
}
// Return the connection to the pool
pool.Put(conn)
}(i)
}
// Wait for all requests to complete
time.Sleep(10 * time.Second)
}
Server Connection Management
On the server side, managing connections effectively is equally important:
package main
import (
"log"
"net"
"os"
"os/signal"
"syscall"
"time"
"github.com/example/service/proto"
"golang.org/x/net/context"
"google.golang.org/grpc"
)
func main() {
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
// Create a new gRPC server with connection management options
server := grpc.NewServer(
// Set maximum number of concurrent streams per connection
grpc.MaxConcurrentStreams(100),
// Set connection timeout
grpc.ConnectionTimeout(5*time.Second),
)
// Register service
proto.RegisterUserServiceServer(server, &userService{})
// Start server in a goroutine
go func() {
log.Println("Starting gRPC server on :50051")
if err := server.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}()
// Set up graceful shutdown
stop := make(chan os.Signal, 1)
signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM)
// Wait for shutdown signal
<-stop
log.Println("Shutting down gRPC server...")
// Gracefully stop the server
server.GracefulStop()
log.Println("Server stopped")
}