Skip to content

Commit 65a3c4e

Browse files
authored
feat(bigtable): add dynamic channel support (#2) (#13504)
Cherrypick of sushanb#2 THe commit id is sushanb@8579aed The delta is same (+933, -4) ``` curl -L https://github.com/sushanb/google-cloud-go/commit/8579aedcf260adbe4da811b7075242a2bbd3fdd7.patch -o fix.patch git am fix.patch ```
1 parent 6647d91 commit 65a3c4e

File tree

8 files changed

+933
-4
lines changed

8 files changed

+933
-4
lines changed

bigtable/bigtable.go

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ type Client struct {
8181
executeQueryRetryOption gax.CallOption
8282
enableDirectAccess bool
8383
featureFlagsMD metadata.MD // Pre-computed feature flags metadata to be sent with each request.
84-
84+
dynamicScaleMonitor *btransport.DynamicScaleMonitor
8585
}
8686

8787
// ClientConfig has configurations for the client.
@@ -96,6 +96,9 @@ type ClientConfig struct {
9696
//
9797
// TODO: support user provided meter provider
9898
MetricsProvider MetricsProvider
99+
100+
// If true, enable dynamic channel pool
101+
EnableDynamicChannelPool bool
99102
}
100103

101104
// MetricsProvider is a wrapper for built in metrics meter provider
@@ -181,10 +184,12 @@ func NewClientWithConfig(ctx context.Context, project, instance string, config C
181184

182185
var connPool gtransport.ConnPool
183186
var connPoolErr error
187+
var dsm *btransport.DynamicScaleMonitor
184188
enableBigtableConnPool := btopt.EnableBigtableConnectionPool()
185189
if enableBigtableConnPool {
186190
fullInstanceName := fmt.Sprintf("projects/%s/instances/%s", project, instance)
187-
connPool, connPoolErr = btransport.NewBigtableChannelPool(ctx,
191+
192+
btPool, err := btransport.NewBigtableChannelPool(ctx,
188193
defaultBigtableConnPoolSize,
189194
btopt.BigtableLoadBalancingStrategy(),
190195
func() (*btransport.BigtableConn, error) {
@@ -200,6 +205,22 @@ func NewClientWithConfig(ctx context.Context, project, instance string, config C
200205
btransport.WithFeatureFlagsMetadata(ffMD),
201206
)
202207

208+
if err != nil {
209+
connPoolErr = err
210+
} else {
211+
connPool = btPool
212+
213+
// Validate dynamic config early if enabled
214+
if config.EnableDynamicChannelPool {
215+
if err := btransport.ValidateDynamicConfig(btopt.DefaultDynamicChannelPoolConfig(), defaultBigtableConnPoolSize); err != nil {
216+
return nil, fmt.Errorf("invalid DynamicChannelPoolConfig: %w", err)
217+
}
218+
219+
dsm = btransport.NewDynamicScaleMonitor(btopt.DefaultDynamicChannelPoolConfig(), btPool)
220+
dsm.Start(ctx) // Start the monitor's background goroutine
221+
}
222+
}
223+
203224
} else {
204225
// use to regular ConnPool
205226
connPool, connPoolErr = gtransport.DialPool(ctx, o...)
@@ -221,11 +242,15 @@ func NewClientWithConfig(ctx context.Context, project, instance string, config C
221242
executeQueryRetryOption: executeQueryRetryOption,
222243
enableDirectAccess: enableDirectAccess,
223244
featureFlagsMD: ffMD,
245+
dynamicScaleMonitor: dsm,
224246
}, nil
225247
}
226248

227249
// Close closes the Client.
228250
func (c *Client) Close() error {
251+
if c.dynamicScaleMonitor != nil {
252+
c.dynamicScaleMonitor.Stop()
253+
}
229254
if c.metricsTracerFactory != nil {
230255
c.metricsTracerFactory.shutdown()
231256
}

bigtable/internal/option/option.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,3 +226,29 @@ func Debugf(logger *log.Logger, format string, v ...interface{}) {
226226
logf(logger, debugFormat, v...)
227227
}
228228
}
229+
230+
// DynamicChannelPoolConfig holds the parameters for dynamic channel pool scaling.
231+
type DynamicChannelPoolConfig struct {
232+
Enabled bool // Whether dynamic scaling is enabled.
233+
MinConns int // Minimum conns allowed
234+
MaxConns int // Maximum conns allowed.
235+
AvgLoadHighThreshold float64 // Average weighted load per connection to trigger scale-up.
236+
AvgLoadLowThreshold float64 // Average weighted load per connection to trigger scale-down.
237+
MinScalingInterval time.Duration // Minimum time between scaling operations (both up and down).
238+
CheckInterval time.Duration // How often to check if scaling is needed.
239+
MaxRemoveConns int // Maximum number of connections to remove at once.
240+
}
241+
242+
// DefaultDynamicChannelPoolConfig is default settings for dynamic channel pool
243+
func DefaultDynamicChannelPoolConfig() DynamicChannelPoolConfig {
244+
return DynamicChannelPoolConfig{
245+
Enabled: true, // Enabled by default
246+
MinConns: 10,
247+
MaxConns: 200,
248+
AvgLoadHighThreshold: 50,
249+
AvgLoadLowThreshold: 5,
250+
MinScalingInterval: 1 * time.Minute,
251+
CheckInterval: 30 * time.Second,
252+
MaxRemoveConns: 2, // Only Cap for removals
253+
}
254+
}

bigtable/internal/transport/connpool.go

Lines changed: 177 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"math/rand"
2525
"net/url"
2626
"slices"
27+
"sort"
2728
"sync"
2829
"sync/atomic"
2930
"time"
@@ -95,6 +96,7 @@ var _ gtransport.ConnPool = &BigtableChannelPool{}
9596
type BigtableConn struct {
9697
*grpc.ClientConn
9798
isALTSConn atomic.Bool
99+
createdAt atomic.Int64
98100
}
99101

100102
// Prime sends a PingAndWarm request to warm up the connection.
@@ -131,9 +133,17 @@ func (bc *BigtableConn) Prime(ctx context.Context, fullInstanceName, appProfileI
131133

132134
// NewBigtableConn creates a wrapped grpc Client Conn
133135
func NewBigtableConn(conn *grpc.ClientConn) *BigtableConn {
134-
return &BigtableConn{
136+
bc := &BigtableConn{
135137
ClientConn: conn,
136138
}
139+
bc.createdAt.Store(time.Now().UnixMilli())
140+
return bc
141+
142+
}
143+
144+
// createdAt returns the creation time of the connection in int64. milliseconds since epoch
145+
func (bc *BigtableConn) creationTime() int64 {
146+
return bc.createdAt.Load()
137147
}
138148

139149
// connEntry represents a single connection in the pool.
@@ -155,6 +165,15 @@ func (e *connEntry) isALTSUsed() bool {
155165
return e.conn.isALTSConn.Load()
156166
}
157167

168+
// createdAt returns the creation time of the connection in the entry.
169+
// It returns the zero if conn is nil.
170+
func (e *connEntry) createdAt() int64 {
171+
if e.conn == nil {
172+
return 0
173+
}
174+
return e.conn.creationTime()
175+
}
176+
158177
// isDraining atomically checks if the connection is in the draining state.
159178
func (e *connEntry) isDraining() bool {
160179
return e.drainingState.Load()
@@ -584,6 +603,163 @@ func (s *refCountedStream) decrementLoad() {
584603
})
585604
}
586605

606+
// addConnections returns true if the pool size changed.
607+
// TODO: addConnections has a long section where we dial and prime the connections.
608+
// Currently, we are taking dialMu() throughout the section and dialMu() is also required for
609+
// replaceConnection().
610+
//
611+
// Note that DynamicScaleMonitor allows only one evaluateAndScale as it takes a mutex
612+
// during evaluateAndScale so don't expect any size changes in conns
613+
func (p *BigtableChannelPool) addConnections(increaseDelta, maxConns int) bool {
614+
// dialMu access
615+
p.dialMu.Lock()
616+
defer p.dialMu.Unlock()
617+
numCurrent := p.Num()
618+
currentConns := p.getConns()
619+
maxDelta := maxConns - numCurrent
620+
cappedIncrease := min(increaseDelta, maxDelta)
621+
622+
if cappedIncrease <= 0 {
623+
return false
624+
}
625+
626+
// LONG SECTION<START>
627+
// This section can take time as it involves creating conn and Prime()
628+
// TODO(): Avoid taking dialMu here.
629+
results := make(chan *connEntry, cappedIncrease)
630+
var wg sync.WaitGroup
631+
632+
for i := 0; i < cappedIncrease; i++ {
633+
wg.Add(1)
634+
go func() {
635+
defer wg.Done()
636+
637+
select {
638+
case <-p.poolCtx.Done():
639+
btopt.Debugf(p.logger, "bigtable_connpool: Context done, skipping connection creation: %v\n", p.poolCtx.Err())
640+
return
641+
default:
642+
}
643+
644+
conn, err := p.dial()
645+
if err != nil {
646+
btopt.Debugf(p.logger, "bigtable_connpool: Failed to dial new connection for scale up: %v\n", err)
647+
return
648+
}
649+
650+
err = conn.Prime(p.poolCtx, p.instanceName, p.appProfile, p.featureFlagsMD)
651+
if err != nil {
652+
btopt.Debugf(p.logger, "bigtable_connpool: Failed to prime new connection: %v. Connection will not be added.\n", err)
653+
conn.Close()
654+
return
655+
}
656+
657+
results <- &connEntry{conn: conn}
658+
}()
659+
}
660+
// Goroutine to close the results channel once all workers are done.
661+
go func() {
662+
wg.Wait()
663+
close(results)
664+
}()
665+
666+
newEntries := make([]*connEntry, 0, cappedIncrease)
667+
for entry := range results {
668+
newEntries = append(newEntries, entry)
669+
}
670+
671+
if len(newEntries) == 0 {
672+
btopt.Debugf(p.logger, "bigtable_connpool: No new connections were successfully created and primed.\n")
673+
return false
674+
}
675+
676+
// LONG SECTION<END>
677+
678+
// add now
679+
combinedConns := make([]*connEntry, numCurrent+len(newEntries))
680+
copy(combinedConns, currentConns)
681+
copy(combinedConns[numCurrent:], newEntries)
682+
p.conns.Store(&combinedConns)
683+
684+
btopt.Debugf(p.logger, "bigtable_connpool: Added %d connections, new size: %d\n", numCurrent+len(newEntries), len(combinedConns))
685+
return true
686+
}
687+
688+
type entryWithAge struct {
689+
entry *connEntry
690+
createdAt int64
691+
}
692+
693+
// removeConnections returns true if the pool size changed. It removes the oldest connections available in the conns.
694+
func (p *BigtableChannelPool) removeConnections(decreaseDelta, minConns, maxRemoveConns int) bool {
695+
// the critical section is very short
696+
// as we just need to sort the conns and get rid of n old connections.
697+
p.dialMu.Lock()
698+
699+
if decreaseDelta <= 0 {
700+
p.dialMu.Unlock()
701+
return false
702+
}
703+
snapshotConns := p.getConns()
704+
numSnapshot := len(snapshotConns)
705+
706+
if numSnapshot <= minConns {
707+
p.dialMu.Unlock()
708+
btopt.Debugf(p.logger, "bigtable_connpool: Removal skippped, current size %d <= minConns %d\n", numSnapshot, minConns)
709+
return false
710+
}
711+
712+
// the max we can decrease is min(maxRemoveConns, min(decreaseDelta, numSnapshot - minConns))
713+
cappedDecrease := min(maxRemoveConns, min(decreaseDelta, numSnapshot-minConns))
714+
715+
if cappedDecrease <= 0 {
716+
p.dialMu.Unlock()
717+
return false
718+
}
719+
720+
entries := make([]entryWithAge, numSnapshot)
721+
for i, entry := range snapshotConns {
722+
// Only consider connections not *already* draining for removal via this logic.
723+
if !entry.isDraining() {
724+
entries[i] = entryWithAge{entry: entry, createdAt: entry.conn.creationTime()}
725+
}
726+
}
727+
728+
// Sort by creation time, oldest first
729+
sort.Slice(entries, func(i, j int) bool {
730+
return entries[i].entry.createdAt() < entries[j].entry.createdAt()
731+
})
732+
733+
// Select the oldest non-draining connections to mark for draining.
734+
connsToDrain := make([]*connEntry, 0, cappedDecrease)
735+
for i := 0; i < cappedDecrease; i++ {
736+
connsToDrain = append(connsToDrain, entries[i].entry)
737+
entries[i].entry.markAsDraining()
738+
}
739+
740+
// Build the slice of connections to keep
741+
// maintains all connections from the snapshot EXCEPT the ones we just
742+
// explicitly marked for removal/draining in this method.
743+
connsToKeep := make([]*connEntry, 0, numSnapshot-cappedDecrease)
744+
for _, entry := range snapshotConns {
745+
if !entry.isDraining() {
746+
connsToKeep = append(connsToKeep, entry)
747+
}
748+
}
749+
750+
p.conns.Store(&connsToKeep) // new slice
751+
// Release the lock
752+
p.dialMu.Unlock()
753+
754+
btopt.Debugf(p.logger, "bigtable_connpool: Marked %d oldest connections for draining, new pool size: %d\n", len(connsToDrain), len(connsToKeep))
755+
// Initiate graceful shutdown for the connections in connsToDrain.
756+
for _, entry := range connsToDrain {
757+
go p.waitForDrainAndClose(entry)
758+
}
759+
return len(connsToDrain) > 0
760+
761+
}
762+
587763
type multiError []error
588764

589765
func (m multiError) Error() string {

bigtable/internal/transport/connpool_helper_test.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,24 @@ func (s *fakeService) StreamingCall(stream testpb.BenchmarkService_StreamingCall
118118
}
119119
}
120120

121+
func (f *fakeService) reset() {
122+
f.mu.Lock()
123+
defer f.mu.Unlock()
124+
f.callCount = 0
125+
f.pingCount = 0
126+
f.serverErr = nil
127+
f.pingErr = nil
128+
f.delay = 0
129+
f.lastPingAndWarmMetadata = nil
130+
if f.streamSema != nil {
131+
select {
132+
case <-f.streamSema: // Drain if not closed
133+
default:
134+
}
135+
}
136+
f.streamSema = nil
137+
}
138+
121139
func (s *fakeService) PingAndWarm(ctx context.Context, req *btpb.PingAndWarmRequest) (*btpb.PingAndWarmResponse, error) {
122140
s.mu.Lock()
123141
s.pingCount++
@@ -188,7 +206,6 @@ func setupTestServer(t testing.TB, service *fakeService) string {
188206
srv.Stop()
189207
lis.Close()
190208
})
191-
192209
return lis.Addr().String()
193210
}
194211

0 commit comments

Comments
 (0)