Skip to content

Commit 7aa9612

Browse files
authored
feat(bigtable): Add experimental Bigtable connection pool with custom load balancing strategy. (#12882)
Configurable Load Balancing: The pool accepts a btopt.LoadBalancingStrategy on creation, which determines how it selects a connection for a new request. The following strategies are implemented: LeastInFlight: Scans all connections and picks the one with the absolute lowest number of active requests. PowerOfTwoLeastInFlight: Picks two random connections and routes the request to the one with the lower load. This is a common and efficient alternative to a full scan. RoundRobin: The default strategy, which cycles through connections sequentially.
1 parent 136f25f commit 7aa9612

File tree

4 files changed

+1016
-5
lines changed

4 files changed

+1016
-5
lines changed

bigtable/bigtable.go

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333

3434
btpb "cloud.google.com/go/bigtable/apiv2/bigtablepb"
3535
btopt "cloud.google.com/go/bigtable/internal/option"
36+
btransport "cloud.google.com/go/bigtable/internal/transport"
3637
"cloud.google.com/go/internal/trace"
3738
gax "github.com/googleapis/gax-go/v2"
3839
"github.com/googleapis/gax-go/v2/apierror"
@@ -55,6 +56,8 @@ const (
5556
queryExpiredViolationType = "PREPARED_QUERY_EXPIRED"
5657
preparedQueryExpireEarlyDuration = time.Second
5758
methodNameReadRows = "ReadRows"
59+
// Cannot extract extract d.GRPCConnPoolSize as DialSettings is in internal grpc pacakage
60+
defaultBigtableConnPoolSize = 10
5861

5962
// For routing cookie
6063
cookiePrefix = "x-goog-cbt-cookie-"
@@ -156,11 +159,6 @@ func NewClientWithConfig(ctx context.Context, project, instance string, config C
156159
// TODO(b/372244283): Remove after b/358175516 has been fixed
157160
o = append(o, internaloption.EnableAsyncRefreshDryRun(metricsTracerFactory.newAsyncRefreshErrHandler()))
158161

159-
connPool, err := gtransport.DialPool(ctx, o...)
160-
if err != nil {
161-
return nil, err
162-
}
163-
164162
disableRetryInfo := false
165163

166164
// If DISABLE_RETRY_INFO=1, library does not base retry decision and back off time on server returned RetryInfo value.
@@ -172,6 +170,23 @@ func NewClientWithConfig(ctx context.Context, project, instance string, config C
172170
retryOption = clientOnlyRetryOption
173171
executeQueryRetryOption = clientOnlyExecuteQueryRetryOption
174172
}
173+
174+
var connPool gtransport.ConnPool
175+
var connPoolErr error
176+
enableBigtableConnPool := btopt.EnableBigtableConnectionPool()
177+
if enableBigtableConnPool {
178+
connPool, connPoolErr = btransport.NewBigtableChannelPool(defaultBigtableConnPoolSize, btopt.BigtableLoadBalancingStrategy(), func() (*grpc.ClientConn, error) {
179+
return gtransport.Dial(ctx, o...)
180+
})
181+
} else {
182+
// use to regular ConnPool
183+
connPool, connPoolErr = gtransport.DialPool(ctx, o...)
184+
}
185+
186+
if connPoolErr != nil {
187+
return nil, connPoolErr
188+
}
189+
175190
return &Client{
176191
connPool: connPool,
177192
client: btpb.NewBigtableClient(connPool),

bigtable/internal/option/option.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"fmt"
2323
"os"
2424
"strconv"
25+
"strings"
2526
"time"
2627

2728
"cloud.google.com/go/bigtable/internal"
@@ -34,6 +35,19 @@ import (
3435
"google.golang.org/grpc/metadata"
3536
)
3637

38+
const (
39+
// LoadBalancingStrategyEnvVar is the environment variable to control the gRPC load balancing strategy.
40+
LoadBalancingStrategyEnvVar = "CBT_LOAD_BALANCING_STRATEGY"
41+
// RoundRobinLBPolicy is the policy name for round-robin.
42+
RoundRobinLBPolicy = "round_robin"
43+
// LeastInFlightLBPolicy is the policy name for least in flight (custom).
44+
LeastInFlightLBPolicy = "least_in_flight"
45+
// PowerOfTwoLeastInFlightLBPolicy is the policy name for power of two least in flight (custom).
46+
PowerOfTwoLeastInFlightLBPolicy = "power_of_two_least_in_flight"
47+
// BigtableConnectionPoolEnvVar is the env var for enabling Bigtable Connection Pool.
48+
BigtableConnectionPoolEnvVar = "CBT_BIGTABLE_CONN_POOL"
49+
)
50+
3751
// mergeOutgoingMetadata returns a context populated by the existing outgoing
3852
// metadata merged with the provided mds.
3953
func mergeOutgoingMetadata(ctx context.Context, mds ...metadata.MD) context.Context {
@@ -124,3 +138,66 @@ func ClientInterceptorOptions(stream []grpc.StreamClientInterceptor, unary []grp
124138
option.WithGRPCDialOption(grpc.WithChainUnaryInterceptor(unary...)),
125139
}
126140
}
141+
142+
// LoadBalancingStrategy for connection pool.
143+
type LoadBalancingStrategy int
144+
145+
const (
146+
// RoundRobin is the round_robin gRPC load balancing policy.
147+
RoundRobin LoadBalancingStrategy = iota
148+
// LeastInFlight is the least_in_flight gRPC load balancing policy (custom).
149+
LeastInFlight
150+
// PowerOfTwoLeastInFlight is the power_of_two_least_in_flight gRPC load balancing policy (custom).
151+
PowerOfTwoLeastInFlight
152+
)
153+
154+
// String returns the string representation of the LoadBalancingStrategy.
155+
func (s LoadBalancingStrategy) String() string {
156+
switch s {
157+
case LeastInFlight:
158+
return "least_in_flight"
159+
case PowerOfTwoLeastInFlight:
160+
return "power_of_two_least_in_flight"
161+
case RoundRobin:
162+
return "round_robin"
163+
default:
164+
return "round_robin" // Default
165+
}
166+
}
167+
168+
// parseLoadBalancingStrategy parses the string from the environment variable
169+
// into a LoadBalancingStrategy enum value.
170+
func parseLoadBalancingStrategy(strategyStr string) LoadBalancingStrategy {
171+
switch strings.ToUpper(strategyStr) {
172+
case "LEAST_IN_FLIGHT":
173+
return LeastInFlight
174+
case "POWER_OF_TWO_LEAST_IN_FLIGHT":
175+
return PowerOfTwoLeastInFlight
176+
case "ROUND_ROBIN":
177+
return RoundRobin
178+
case "":
179+
return RoundRobin // Default if env var is not set
180+
default:
181+
return RoundRobin // Default for unknown values
182+
}
183+
}
184+
185+
// BigtableLoadBalancingStrategy returns the gRPC service config JSON string for the chosen policy.
186+
func BigtableLoadBalancingStrategy() LoadBalancingStrategy {
187+
strategyStr := os.Getenv(LoadBalancingStrategyEnvVar)
188+
return parseLoadBalancingStrategy(strategyStr)
189+
}
190+
191+
// EnableBigtableConnectionPool uses new conn pool if envVar is set.
192+
func EnableBigtableConnectionPool() bool {
193+
bigtableConnPoolEnvVal := os.Getenv(BigtableConnectionPoolEnvVar)
194+
if bigtableConnPoolEnvVal == "" {
195+
return false
196+
}
197+
enableBigtableConnPool, err := strconv.ParseBool(bigtableConnPoolEnvVal)
198+
if err != nil {
199+
// just fail and use default conn pool
200+
return false
201+
}
202+
return enableBigtableConnPool
203+
}

0 commit comments

Comments
 (0)