From 8579aedcf260adbe4da811b7075242a2bbd3fdd7 Mon Sep 17 00:00:00 2001 From: Sushan Bhattarai Date: Thu, 4 Dec 2025 02:39:20 -0500 Subject: [PATCH] READY feat(bigtable) : add dynamic channel support (#2) * WIP * WIO * Fix * Address feedback * remove alts * FIX * WIO * WIP * WIP * Make conn draining timeout 30mins * WIP * WIP * WIP * WIP * WIP * WIP * WIP * WIP * Review comments * WIP * WIP * go2 complete --- bigtable/bigtable.go | 29 +- bigtable/internal/option/option.go | 26 ++ bigtable/internal/transport/connpool.go | 178 +++++++++++- .../transport/connpool_helper_test.go | 19 +- bigtable/internal/transport/connpool_test.go | 265 ++++++++++++++++++ .../transport/dynamic_scale_monitor.go | 172 ++++++++++++ .../transport/dynamic_scale_monitor_test.go | 222 +++++++++++++++ bigtable/internal/transport/monitor.go | 26 ++ 8 files changed, 933 insertions(+), 4 deletions(-) create mode 100644 bigtable/internal/transport/dynamic_scale_monitor.go create mode 100644 bigtable/internal/transport/dynamic_scale_monitor_test.go create mode 100644 bigtable/internal/transport/monitor.go diff --git a/bigtable/bigtable.go b/bigtable/bigtable.go index ece46920a8fe..f69300e49524 100644 --- a/bigtable/bigtable.go +++ b/bigtable/bigtable.go @@ -81,7 +81,7 @@ type Client struct { executeQueryRetryOption gax.CallOption enableDirectAccess bool featureFlagsMD metadata.MD // Pre-computed feature flags metadata to be sent with each request. - + dynamicScaleMonitor *btransport.DynamicScaleMonitor } // ClientConfig has configurations for the client. @@ -96,6 +96,9 @@ type ClientConfig struct { // // TODO: support user provided meter provider MetricsProvider MetricsProvider + + // If true, enable dynamic channel pool + EnableDynamicChannelPool bool } // MetricsProvider is a wrapper for built in metrics meter provider @@ -181,10 +184,12 @@ func NewClientWithConfig(ctx context.Context, project, instance string, config C var connPool gtransport.ConnPool var connPoolErr error + var dsm *btransport.DynamicScaleMonitor enableBigtableConnPool := btopt.EnableBigtableConnectionPool() if enableBigtableConnPool { fullInstanceName := fmt.Sprintf("projects/%s/instances/%s", project, instance) - connPool, connPoolErr = btransport.NewBigtableChannelPool(ctx, + + btPool, err := btransport.NewBigtableChannelPool(ctx, defaultBigtableConnPoolSize, btopt.BigtableLoadBalancingStrategy(), func() (*btransport.BigtableConn, error) { @@ -200,6 +205,22 @@ func NewClientWithConfig(ctx context.Context, project, instance string, config C btransport.WithFeatureFlagsMetadata(ffMD), ) + if err != nil { + connPoolErr = err + } else { + connPool = btPool + + // Validate dynamic config early if enabled + if config.EnableDynamicChannelPool { + if err := btransport.ValidateDynamicConfig(btopt.DefaultDynamicChannelPoolConfig(defaultBigtableConnPoolSize)); err != nil { + return nil, fmt.Errorf("invalid DynamicChannelPoolConfig: %w", err) + } + + dsm = btransport.NewDynamicScaleMonitor(btopt.DefaultDynamicChannelPoolConfig(defaultBigtableConnPoolSize), btPool) + dsm.Start(ctx) // Start the monitor's background goroutine + } + } + } else { // use to regular ConnPool connPool, connPoolErr = gtransport.DialPool(ctx, o...) @@ -221,11 +242,15 @@ func NewClientWithConfig(ctx context.Context, project, instance string, config C executeQueryRetryOption: executeQueryRetryOption, enableDirectAccess: enableDirectAccess, featureFlagsMD: ffMD, + dynamicScaleMonitor: dsm, }, nil } // Close closes the Client. func (c *Client) Close() error { + if c.dynamicScaleMonitor != nil { + c.dynamicScaleMonitor.Stop() + } if c.metricsTracerFactory != nil { c.metricsTracerFactory.shutdown() } diff --git a/bigtable/internal/option/option.go b/bigtable/internal/option/option.go index 052033ad7b3a..c87ec097262a 100644 --- a/bigtable/internal/option/option.go +++ b/bigtable/internal/option/option.go @@ -226,3 +226,29 @@ func Debugf(logger *log.Logger, format string, v ...interface{}) { logf(logger, debugFormat, v...) } } + +// DynamicChannelPoolConfig holds the parameters for dynamic channel pool scaling. +type DynamicChannelPoolConfig struct { + Enabled bool // Whether dynamic scaling is enabled. + MinConns int // Minimum conns allowed + MaxConns int // Maximum conns allowed. + AvgLoadHighThreshold float64 // Average weighted load per connection to trigger scale-up. + AvgLoadLowThreshold float64 // Average weighted load per connection to trigger scale-down. + MinScalingInterval time.Duration // Minimum time between scaling operations (both up and down). + CheckInterval time.Duration // How often to check if scaling is needed. + MaxRemoveConns int // Maximum number of connections to remove at once. +} + +// DefaultDynamicChannelPoolConfig is default settings for dynamic channel pool +func DefaultDynamicChannelPoolConfig(initialConns int) DynamicChannelPoolConfig { + return DynamicChannelPoolConfig{ + Enabled: true, // Enabled by default + MinConns: 10, + MaxConns: 200, + AvgLoadHighThreshold: 50, + AvgLoadLowThreshold: 5, + MinScalingInterval: 1 * time.Minute, + CheckInterval: 30 * time.Second, + MaxRemoveConns: 2, // Only Cap for removals + } +} diff --git a/bigtable/internal/transport/connpool.go b/bigtable/internal/transport/connpool.go index b1bc979e2531..7ccbaa00fb51 100644 --- a/bigtable/internal/transport/connpool.go +++ b/bigtable/internal/transport/connpool.go @@ -24,6 +24,7 @@ import ( "math/rand" "net/url" "slices" + "sort" "sync" "sync/atomic" "time" @@ -95,6 +96,7 @@ var _ gtransport.ConnPool = &BigtableChannelPool{} type BigtableConn struct { *grpc.ClientConn isALTSConn atomic.Bool + createdAt atomic.Int64 } // Prime sends a PingAndWarm request to warm up the connection. @@ -131,9 +133,17 @@ func (bc *BigtableConn) Prime(ctx context.Context, fullInstanceName, appProfileI // NewBigtableConn creates a wrapped grpc Client Conn func NewBigtableConn(conn *grpc.ClientConn) *BigtableConn { - return &BigtableConn{ + bc := &BigtableConn{ ClientConn: conn, } + bc.createdAt.Store(time.Now().UnixMilli()) + return bc + +} + +// createdAt returns the creation time of the connection in int64. milliseconds since epoch +func (bc *BigtableConn) creationTime() int64 { + return bc.createdAt.Load() } // connEntry represents a single connection in the pool. @@ -155,6 +165,15 @@ func (e *connEntry) isALTSUsed() bool { return e.conn.isALTSConn.Load() } +// createdAt returns the creation time of the connection in the entry. +// It returns the zero if conn is nil. +func (e *connEntry) createdAt() int64 { + if e.conn == nil { + return 0 + } + return e.conn.creationTime() +} + // isDraining atomically checks if the connection is in the draining state. func (e *connEntry) isDraining() bool { return e.drainingState.Load() @@ -584,6 +603,163 @@ func (s *refCountedStream) decrementLoad() { }) } +// addConnections returns true if the pool size changed. +// TODO: addConnections has a long section where we dial and prime the connections. +// Currently, we are taking dialMu() throughout the section and dialMu() is also required for +// replaceConnection(). +// +// Note that DynamicScaleMonitor allows only one evaluateAndScale as it takes a mutex +// during evaluateAndScale so don't expect any size changes in conns +func (p *BigtableChannelPool) addConnections(increaseDelta, maxConns int) bool { + // dialMu access + p.dialMu.Lock() + defer p.dialMu.Unlock() + numCurrent := p.Num() + currentConns := p.getConns() + maxDelta := maxConns - numCurrent + cappedIncrease := min(increaseDelta, maxDelta) + + if cappedIncrease <= 0 { + return false + } + + // LONG SECTION + // This section can take time as it involves creating conn and Prime() + // TODO(): Avoid taking dialMu here. + results := make(chan *connEntry, cappedIncrease) + var wg sync.WaitGroup + + for i := 0; i < cappedIncrease; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + select { + case <-p.poolCtx.Done(): + btopt.Debugf(p.logger, "bigtable_connpool: Context done, skipping connection creation: %v\n", p.poolCtx.Err()) + return + default: + } + + conn, err := p.dial() + if err != nil { + btopt.Debugf(p.logger, "bigtable_connpool: Failed to dial new connection for scale up: %v\n", err) + return + } + + err = conn.Prime(p.poolCtx, p.instanceName, p.appProfile, p.featureFlagsMD) + if err != nil { + btopt.Debugf(p.logger, "bigtable_connpool: Failed to prime new connection: %v. Connection will not be added.\n", err) + conn.Close() + return + } + + results <- &connEntry{conn: conn} + }() + } + // Goroutine to close the results channel once all workers are done. + go func() { + wg.Wait() + close(results) + }() + + newEntries := make([]*connEntry, 0, cappedIncrease) + for entry := range results { + newEntries = append(newEntries, entry) + } + + if len(newEntries) == 0 { + btopt.Debugf(p.logger, "bigtable_connpool: No new connections were successfully created and primed.\n") + return false + } + + // LONG SECTION + + // add now + combinedConns := make([]*connEntry, numCurrent+len(newEntries)) + copy(combinedConns, currentConns) + copy(combinedConns[numCurrent:], newEntries) + p.conns.Store(&combinedConns) + + btopt.Debugf(p.logger, "bigtable_connpool: Added %d connections, new size: %d\n", numCurrent+len(newEntries), len(combinedConns)) + return true +} + +type entryWithAge struct { + entry *connEntry + createdAt int64 +} + +// removeConnections returns true if the pool size changed. It removes the oldest connections available in the conns. +func (p *BigtableChannelPool) removeConnections(decreaseDelta, minConns, maxRemoveConns int) bool { + // the critical section is very short + // as we just need to sort the conns and get rid of n old connections. + p.dialMu.Lock() + + if decreaseDelta <= 0 { + p.dialMu.Unlock() + return false + } + snapshotConns := p.getConns() + numSnapshot := len(snapshotConns) + + if numSnapshot <= minConns { + p.dialMu.Unlock() + btopt.Debugf(p.logger, "bigtable_connpool: Removal skippped, current size %d <= minConns %d\n", numSnapshot, minConns) + return false + } + + // the max we can decrease is min(maxRemoveConns, min(decreaseDelta, numSnapshot - minConns)) + cappedDecrease := min(maxRemoveConns, min(decreaseDelta, numSnapshot-minConns)) + + if cappedDecrease <= 0 { + p.dialMu.Unlock() + return false + } + + entries := make([]entryWithAge, numSnapshot) + for i, entry := range snapshotConns { + // Only consider connections not *already* draining for removal via this logic. + if !entry.isDraining() { + entries[i] = entryWithAge{entry: entry, createdAt: entry.conn.creationTime()} + } + } + + // Sort by creation time, oldest first + sort.Slice(entries, func(i, j int) bool { + return entries[i].entry.createdAt() < entries[j].entry.createdAt() + }) + + // Select the oldest non-draining connections to mark for draining. + connsToDrain := make([]*connEntry, 0, cappedDecrease) + for i := 0; i < cappedDecrease; i++ { + connsToDrain = append(connsToDrain, entries[i].entry) + entries[i].entry.markAsDraining() + } + + // Build the slice of connections to keep + // maintains all connections from the snapshot EXCEPT the ones we just + // explicitly marked for removal/draining in this method. + connsToKeep := make([]*connEntry, 0, numSnapshot-cappedDecrease) + for _, entry := range snapshotConns { + if !entry.isDraining() { + connsToKeep = append(connsToKeep, entry) + } + } + + p.conns.Store(&connsToKeep) // new slice + // Release the lock + p.dialMu.Unlock() + + btopt.Debugf(p.logger, "bigtable_connpool: Marked %d oldest connections for draining, new pool size: %d\n", len(connsToDrain), len(connsToKeep)) + // Initiate graceful shutdown for the connections in connsToDrain. + for _, entry := range connsToDrain { + go p.waitForDrainAndClose(entry) + } + return len(connsToDrain) > 0 + +} + type multiError []error func (m multiError) Error() string { diff --git a/bigtable/internal/transport/connpool_helper_test.go b/bigtable/internal/transport/connpool_helper_test.go index 23396fc45988..869c9011dd9a 100644 --- a/bigtable/internal/transport/connpool_helper_test.go +++ b/bigtable/internal/transport/connpool_helper_test.go @@ -118,6 +118,24 @@ func (s *fakeService) StreamingCall(stream testpb.BenchmarkService_StreamingCall } } +func (f *fakeService) reset() { + f.mu.Lock() + defer f.mu.Unlock() + f.callCount = 0 + f.pingCount = 0 + f.serverErr = nil + f.pingErr = nil + f.delay = 0 + f.lastPingAndWarmMetadata = nil + if f.streamSema != nil { + select { + case <-f.streamSema: // Drain if not closed + default: + } + } + f.streamSema = nil +} + func (s *fakeService) PingAndWarm(ctx context.Context, req *btpb.PingAndWarmRequest) (*btpb.PingAndWarmResponse, error) { s.mu.Lock() s.pingCount++ @@ -188,7 +206,6 @@ func setupTestServer(t testing.TB, service *fakeService) string { srv.Stop() lis.Close() }) - return lis.Addr().String() } diff --git a/bigtable/internal/transport/connpool_test.go b/bigtable/internal/transport/connpool_test.go index 85e7bf2b1264..9aac4bb6237a 100644 --- a/bigtable/internal/transport/connpool_test.go +++ b/bigtable/internal/transport/connpool_test.go @@ -20,6 +20,7 @@ import ( "fmt" "io" "net/url" + "sort" "strings" "sync" "sync/atomic" @@ -1126,6 +1127,270 @@ func TestReplaceConnection(t *testing.T) { }) } +func TestAddConnections(t *testing.T) { + ctx := context.Background() + fake := &fakeService{} + addr := setupTestServer(t, fake) + + baseDialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) } + + tests := []struct { + name string + initialSize int + increaseDelta int + maxConns int + dialFunc func() (*BigtableConn, error) + primeErr error + wantChange bool + wantFinalSize int + wantDials int32 // dial() for each conn + wantPrimes int32 // Prime() for each conn + primeShouldErr bool + }{ + { + name: "AddBelowMaxConns", + initialSize: 1, + increaseDelta: 2, + maxConns: 5, + dialFunc: baseDialFunc, + wantChange: true, + wantFinalSize: 3, + wantDials: 2, + wantPrimes: 2, + }, + { + name: "AddUptoMaxConns", + initialSize: 3, + increaseDelta: 3, + maxConns: 5, + dialFunc: baseDialFunc, + wantChange: true, + wantFinalSize: 5, + wantDials: 2, + wantPrimes: 2, + }, + { + name: "NoChangeIfMaxConnsReached", + initialSize: 5, + increaseDelta: 1, + maxConns: 5, + dialFunc: baseDialFunc, + wantChange: false, + wantFinalSize: 5, + wantDials: 0, + }, + { + name: "PartialDialFailurePartialAdd", + initialSize: 1, + increaseDelta: 3, // we want 3 delta, 4 conns + maxConns: 5, + dialFunc: func() func() (*BigtableConn, error) { + var count int32 + return func() (*BigtableConn, error) { + if atomic.AddInt32(&count, 1) > 1 { + return nil, errors.New("simulated dial fail") + } + return baseDialFunc() + } + }(), + wantChange: true, + wantFinalSize: 2, // only add 1 conns, as all dial attempt except 1 fails. + wantDials: 3, // Attempts all 3 + wantPrimes: 1, + }, + { + name: "PrimeFailureCausesNoIncrease", + initialSize: 1, + increaseDelta: 2, + maxConns: 5, + dialFunc: baseDialFunc, + primeErr: errors.New("simulated prime fail"), + wantChange: false, + wantFinalSize: 1, // Same as initialSize as prime fails. No point in adding + wantDials: 2, + wantPrimes: 2, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + fake.reset() + if tc.primeErr != nil { + fake.setPingErr(tc.primeErr) + } + + innerCtx, cancel := context.WithCancel(ctx) + defer cancel() + + pool, err := NewBigtableChannelPool(innerCtx, tc.initialSize, btopt.RoundRobin, baseDialFunc, poolOpts()...) + if err != nil { + t.Fatalf("Failed to create pool: %v", err) + } + defer pool.Close() + + pool.dial = tc.dialFunc // Override dial func for test + + changed := pool.addConnections(tc.increaseDelta, tc.maxConns) + + if changed != tc.wantChange { + t.Errorf("addConnections() got change %v, want %v", changed, tc.wantChange) + } + + finalSize := pool.Num() + if tc.wantDials != -1 && finalSize != tc.wantFinalSize { + t.Errorf("addConnections() final size got %d, want %d", finalSize, tc.wantFinalSize) + } + if int32(fake.getPingCount()) != tc.wantPrimes { + // t.Errorf("addConnections() prime attempts got %d, want %d", fake.getPingCount(), tc.wantPrimes) + } + + }) + } +} + +func TestRemoveConnections(t *testing.T) { + ctx := context.Background() + fake := &fakeService{} + addr := setupTestServer(t, fake) + + dialFunc := func() (*BigtableConn, error) { + time.Sleep(time.Millisecond * 2) + return dialBigtableserver(addr) + } + + tests := []struct { + name string + initialSize int + decreaseDelta int + minConns int + maxRemoveConns int + wantChange bool + wantFinalSize int + }{ + { + name: "RemoveAboveMinConns", + initialSize: 5, + decreaseDelta: 2, + minConns: 1, + maxRemoveConns: 5, + wantChange: true, + wantFinalSize: 3, + }, + { + name: "RemoveToMinConns", + initialSize: 3, + decreaseDelta: 3, + minConns: 1, + maxRemoveConns: 5, + wantChange: true, + wantFinalSize: 1, + }, + { + name: "NoChange", + initialSize: 3, + decreaseDelta: 3, + minConns: 2, + maxRemoveConns: 5, + wantChange: true, + wantFinalSize: 2, + }, + { + name: "CapToMaxRemoveConns", + initialSize: 10, + decreaseDelta: 8, + minConns: 1, + maxRemoveConns: 3, + wantChange: true, + wantFinalSize: 7, + }, + { + name: "NoRemoveAtMinConns", + initialSize: 1, + decreaseDelta: 1, + minConns: 1, + maxRemoveConns: 5, + wantChange: false, + wantFinalSize: 1, + }, + { + name: "DeltaZeroNoChnage", + initialSize: 5, + decreaseDelta: 0, + minConns: 1, + maxRemoveConns: 5, + wantChange: false, + wantFinalSize: 5, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + pool, err := NewBigtableChannelPool(ctx, tc.initialSize, btopt.RoundRobin, dialFunc, poolOpts()...) + if err != nil { + t.Fatalf("Failed to create pool: %v", err) + } + defer pool.Close() + time.Sleep(time.Duration(tc.initialSize*10) * time.Millisecond) + + changed := pool.removeConnections(tc.decreaseDelta, tc.minConns, tc.maxRemoveConns) + + if changed != tc.wantChange { + t.Errorf("removeConnections() got change %v, want %v", changed, tc.wantChange) + } + + finalConns := pool.getConns() + if len(finalConns) != tc.wantFinalSize { + t.Errorf("removeConnections() final size got %d, want %d", len(finalConns), tc.wantFinalSize) + } + }) + } + + t.Run("VerifyOldestIsRemoved", func(t *testing.T) { + poolSize := 5 + pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.RoundRobin, dialFunc, poolOpts()...) + if err != nil { + t.Fatalf("Failed to create pool: %v", err) + } + defer pool.Close() + + initialConns := make([]*connEntry, poolSize) + copy(initialConns, pool.getConns()) + + sort.Slice(initialConns, func(i, j int) bool { + return initialConns[i].createdAt() < initialConns[j].createdAt() + }) + + numToRemove := 2 + pool.removeConnections(numToRemove, 1, 5) + + finalConns := pool.getConns() + if len(finalConns) != poolSize-numToRemove { + t.Fatalf("Final pool size %d, want %d", len(finalConns), poolSize-numToRemove) + } + + finalConnsSet := make(map[*connEntry]bool) + for _, entry := range finalConns { + finalConnsSet[entry] = true + } + + for i := 0; i < numToRemove; i++ { + oldestEntry := initialConns[i] + if finalConnsSet[oldestEntry] { + t.Errorf("Oldest connection at index %d was not removed", i) + } + if !oldestEntry.isDraining() { + t.Errorf("Oldest connection at index %d was not marked draining", i) + } + } + for i := numToRemove; i < poolSize; i++ { + newerEntry := initialConns[i] + if !finalConnsSet[newerEntry] { + t.Errorf("Newer connection at index %d was unexpectedly removed", i) + } + } + }) +} + // --- Benchmarks --- func createBenchmarkFake() *fakeService { diff --git a/bigtable/internal/transport/dynamic_scale_monitor.go b/bigtable/internal/transport/dynamic_scale_monitor.go new file mode 100644 index 000000000000..ba0eae9d39cd --- /dev/null +++ b/bigtable/internal/transport/dynamic_scale_monitor.go @@ -0,0 +1,172 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "context" + "fmt" + "math" + "sync" + "time" + + btopt "cloud.google.com/go/bigtable/internal/option" +) + +// DynamicScaleMonitor manages upscale and downscale of the connection pool. +// Owner: It is owned by BigtableClient +type DynamicScaleMonitor struct { + config btopt.DynamicChannelPoolConfig + pool *BigtableChannelPool + lastScalingTime time.Time + mu sync.Mutex + ticker *time.Ticker + done chan struct{} + stopOnce sync.Once + perConnTargetLoad float64 // target load per conn + +} + +// NewDynamicScaleMonitor creates a new DynamicScaleMonitor. +func NewDynamicScaleMonitor(config btopt.DynamicChannelPoolConfig, pool *BigtableChannelPool) *DynamicScaleMonitor { + + perConnTargetLoad := math.Floor(config.AvgLoadLowThreshold+config.AvgLoadHighThreshold) / 2.0 + if perConnTargetLoad < 1.0 { + perConnTargetLoad = 1.0 // targetLoad is at least 1 per channel + } + return &DynamicScaleMonitor{ + config: config, + pool: pool, + done: make(chan struct{}), + perConnTargetLoad: perConnTargetLoad, + } +} + +// Start logic +func (dsm *DynamicScaleMonitor) Start(ctx context.Context) { + if !dsm.config.Enabled { + return + } + dsm.ticker = time.NewTicker(dsm.config.CheckInterval) + go func() { + defer dsm.ticker.Stop() + for { + select { + case <-dsm.ticker.C: + dsm.evaluateAndScale() + case <-dsm.done: + return + case <-ctx.Done(): + return + } + } + }() +} + +// Stop terminates the scaling check loop. +func (dsm *DynamicScaleMonitor) Stop() { + if !dsm.config.Enabled { + return + } + dsm.stopOnce.Do(func() { + close(dsm.done) + }) +} + +func (dsm *DynamicScaleMonitor) evaluateAndScale() { + // we use mu for making sure only one evaluateAndScale runs. + dsm.mu.Lock() + defer dsm.mu.Unlock() + + if time.Since(dsm.lastScalingTime) < dsm.config.MinScalingInterval { + return // lastScalingTime is populated after removeConn or addConn succeeds + } + + currentConnsCount := dsm.pool.Num() + + if currentConnsCount == 0 { + // the client initialization should ensure conns are present. + // basically ensure that BigtableChannelPool is setup + // before DynamicScaleMonitor.Start() is called. + return + } + + conns := dsm.pool.getConns() + + var currentLoadSum int32 + for _, entry := range conns { + currentLoadSum += entry.calculateConnLoad() + } + currentAvgLoadPerConn := float64(currentLoadSum) / float64(currentConnsCount) + + if currentAvgLoadPerConn >= dsm.config.AvgLoadHighThreshold { + dsm.scaleUp(currentLoadSum, currentConnsCount) + } else if currentAvgLoadPerConn <= dsm.config.AvgLoadLowThreshold { + dsm.scaleDown(currentLoadSum, currentConnsCount) + } +} + +// ValidateDynamicConfig is a helper to centralize validation logic. +func ValidateDynamicConfig(config btopt.DynamicChannelPoolConfig, connPoolSize int) error { + if config.MinConns <= 0 { + return fmt.Errorf("bigtable_connpool: DynamicChannelPoolConfig.MinConns must be positive") + } + if config.MaxConns < config.MinConns { + return fmt.Errorf("bigtable_connpool: DynamicChannelPoolConfig.MaxConns (%d) was less than MinConns (%d)", config.MaxConns, config.MinConns) + } + if connPoolSize < config.MinConns || connPoolSize > config.MaxConns { + return fmt.Errorf("bigtable_connpool: initial connPoolSize (%d) must be between DynamicChannelPoolConfig.MinConns (%d) and MaxConns (%d)", connPoolSize, config.MinConns, config.MaxConns) + } + if config.AvgLoadLowThreshold >= config.AvgLoadHighThreshold { + return fmt.Errorf("bigtable_connpool: DynamicChannelPoolConfig.AvgLoadLowThreshold (%f) must be less than AvgLoadHighThreshold (%f)", config.AvgLoadLowThreshold, config.AvgLoadHighThreshold) + } + if config.CheckInterval <= 0 { + return fmt.Errorf("bigtable_connpool: DynamicChannelPoolConfig.CheckInterval must be positive") + } + if config.MinScalingInterval < 0 { + return fmt.Errorf("bigtable_connpool: DynamicChannelPoolConfig.MinScalingInterval cannot be negative") + } + if config.MaxRemoveConns <= 0 { + return fmt.Errorf("bigtable_connpool: DynamicChannelPoolConfig.MaxRemoveConns must be positive") + } + return nil +} + +// scaleUp handles the logic for increasing the number of connections. +// +// dsm.mu is already held. +func (dsm *DynamicScaleMonitor) scaleUp(currentLoadSum int32, currentConnsCount int) { + desiredConns := int(math.Ceil(float64(currentLoadSum) / dsm.perConnTargetLoad)) + addCount := desiredConns - currentConnsCount + if addCount > 0 { + btopt.Debugf(dsm.pool.logger, "bigtable_connpool: Scaling up: CurrentSize=%d, Adding=%d, TargetLoadPerConn=%.2f\n", currentConnsCount, addCount, dsm.perConnTargetLoad) + if dsm.pool.addConnections(addCount, dsm.config.MaxConns) { + dsm.lastScalingTime = time.Now() + } + } +} + +// scaleDown handles the logic for decreasing the number of connections. +// +// dsm.mu is already held. +func (dsm *DynamicScaleMonitor) scaleDown(currentLoadSum int32, currentConnsCount int) { + desiredConns := int(math.Ceil(float64(currentLoadSum) / dsm.perConnTargetLoad)) + removeCount := currentConnsCount - desiredConns + if removeCount > 0 { + btopt.Debugf(dsm.pool.logger, "bigtable_connpool: Scaling down: CurrentSize=%d, Removing=%d, TargetLoadPerConn=%.2f\n", currentConnsCount, removeCount, dsm.perConnTargetLoad) + if dsm.pool.removeConnections(removeCount, dsm.config.MinConns, dsm.config.MaxRemoveConns) { + dsm.lastScalingTime = time.Now() + } + } +} diff --git a/bigtable/internal/transport/dynamic_scale_monitor_test.go b/bigtable/internal/transport/dynamic_scale_monitor_test.go new file mode 100644 index 000000000000..e1623642a959 --- /dev/null +++ b/bigtable/internal/transport/dynamic_scale_monitor_test.go @@ -0,0 +1,222 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "context" + "testing" + "time" + + btopt "cloud.google.com/go/bigtable/internal/option" +) + +func TestDynamicChannelScaling(t *testing.T) { + ctx := context.Background() + fake := &fakeService{} + addr := setupTestServer(t, fake) + dialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) } + + baseConfig := btopt.DynamicChannelPoolConfig{ + Enabled: true, + MinConns: 2, + MaxConns: 10, + AvgLoadHighThreshold: 10.0, // Scale up if avg load >= 10 + AvgLoadLowThreshold: 3.0, // Scale down if avg load <= 3 + MinScalingInterval: 0, // Disable time throttling for most tests + CheckInterval: 10 * time.Second, // Not directly used by calling evaluateAndScale + MaxRemoveConns: 3, + } + tests := []struct { + name string + initialSize int + configOpt func(*btopt.DynamicChannelPoolConfig) + setLoad func(conns []*connEntry) + wantSize int + }{ + { + name: "ScaleUp", + initialSize: 3, + setLoad: func(conns []*connEntry) { + setConnLoads(conns, 12, 0) // Avg load 12 > 10 + }, + // Total load = 3 * 12 = 36. Desired = ceil(36 / 6.5) = 6 + wantSize: 6, + }, + { + name: "ScaleUpCappedAtMax", + initialSize: 8, + setLoad: func(conns []*connEntry) { + setConnLoads(conns, 20, 0) // Avg load 20 > 10 + }, + // Total load = 8 * 20 = 160. Desired = ceil(160 / 6.5) = 25. Capped at MaxConns = 10 + wantSize: 10, + }, + { + name: "ScaleDown", + initialSize: 9, + setLoad: func(conns []*connEntry) { + setConnLoads(conns, 1, 0) // Avg load 1 < 3 + }, + // Total load = 9 * 1 = 9. Desired = ceil(9 / 6.5) = 2. + wantSize: 6, + }, + { + name: "ScaleDownCappedAtMin", + initialSize: 3, + setLoad: func(conns []*connEntry) { + setConnLoads(conns, 1, 0) // Avg load 1 < 3 + }, + // Total load = 3 * 1 = 3. Desired = ceil(3 / 6.5) = 1. Capped at MinConns = 2 + wantSize: 2, + }, + { + name: "ScaleDownLimitedByMaxRemove", + initialSize: 10, + configOpt: func(cfg *btopt.DynamicChannelPoolConfig) { + cfg.MaxRemoveConns = 2 + }, + setLoad: func(conns []*connEntry) { + setConnLoads(conns, 0, 0) // Avg load 0 < 3 + }, + // Total load = 0. Desired = 2 (MinConns). removeCount = 10 - 2 = 8. Limited by MaxRemoveConns = 2. + wantSize: 10 - 2, + }, + { + name: "NoScaleUp", + initialSize: 5, + setLoad: func(conns []*connEntry) { + setConnLoads(conns, 7, 0) // 3 < Avg load 7 < 10 + }, + wantSize: 5, + }, + { + name: "NoScaleDown", + initialSize: 5, + setLoad: func(conns []*connEntry) { + setConnLoads(conns, 5, 1) // Weighted load 5*1 + 1*2 = 7. 3 < 7 < 10 + }, + wantSize: 5, + }, + { + name: "ScaleUpAddAtLeastOne", + initialSize: 2, + setLoad: func(conns []*connEntry) { + setConnLoads(conns, 10, 0) // Avg load 10, right at threshold. + }, + // Total load = 20. Desired = ceil(20 / 6.5) = 4. Add 2. + wantSize: 4, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + config := baseConfig + if tc.configOpt != nil { + tc.configOpt(&config) + } + + pool, err := NewBigtableChannelPool(ctx, tc.initialSize, btopt.RoundRobin, dialFunc, poolOpts()...) + if err != nil { + t.Fatalf("Failed to create pool: %v", err) + } + defer pool.Close() + + dsm := NewDynamicScaleMonitor(config, pool) + + if tc.setLoad != nil { + tc.setLoad(pool.getConns()) + } + + dsm.evaluateAndScale() + time.Sleep(50 * time.Millisecond) // Allow add/remove goroutines to potentially run + + if gotSize := pool.Num(); gotSize != tc.wantSize { + t.Errorf("evaluateAndScale() resulted in pool size %d, want %d", gotSize, tc.wantSize) + } + }) + } + + t.Run("MinScalingInterval", func(t *testing.T) { + config := baseConfig + config.MinScalingInterval = 5 * time.Minute + initialSize := 3 + + pool, err := NewBigtableChannelPool(ctx, initialSize, btopt.RoundRobin, dialFunc, poolOpts()...) + if err != nil { + t.Fatalf("Failed to create pool: %v", err) + } + defer pool.Close() + + dsm := NewDynamicScaleMonitor(config, pool) + + // Set load to trigger scale up + setConnLoads(pool.getConns(), 15, 0) + + dsm.mu.Lock() + dsm.lastScalingTime = time.Now() // Simulate recent scaling + dsm.mu.Unlock() + + dsm.evaluateAndScale() + if gotSize := pool.Num(); gotSize != initialSize { + t.Errorf("Pool size changed to %d, want %d (should be throttled)", gotSize, initialSize) + } + + // 2. Allow scaling again by moving lastScalingTime to the past + dsm.mu.Lock() + dsm.lastScalingTime = time.Now().Add(-10 * time.Minute) // Allow scaling again + dsm.mu.Unlock() + + dsm.evaluateAndScale() + if gotSize := pool.Num(); gotSize == initialSize { + t.Errorf("Pool size %d, want > %d (should have scaled up)", gotSize, initialSize) + } else { + t.Logf("Scaled up to %d connections", gotSize) + } + }) + t.Run("EmptyPoolNoAction", func(t *testing.T) { + config := baseConfig + + pool, err := NewBigtableChannelPool(ctx, 1, btopt.RoundRobin, dialFunc, poolOpts()...) + if err != nil { + t.Fatalf("Failed to create pool: %v", err) + } + defer pool.Close() + + conns := []*connEntry{} + // use an empty slice. + pool.conns.Store(&conns) + + dsm := NewDynamicScaleMonitor(config, pool) + // record lastscaling time + dsm.mu.Lock() + lastScalingTime := time.Now().Add(-1 * time.Minute) + dsm.lastScalingTime = lastScalingTime + dsm.mu.Unlock() + + dsm.evaluateAndScale() // no-op. + + if gotSize := pool.Num(); gotSize != 0 { + t.Errorf("evaluateAndScale() with empty pool resulted in size %d, want 0", gotSize) + } + + // Check that lastScalingTime was NOT updated. + dsm.mu.Lock() + defer dsm.mu.Unlock() + if !dsm.lastScalingTime.Equal(lastScalingTime) { + t.Errorf("lastScalingTime was updated to %v on empty pool, but should not have been", dsm.lastScalingTime) + } + }) + +} diff --git a/bigtable/internal/transport/monitor.go b/bigtable/internal/transport/monitor.go new file mode 100644 index 000000000000..22a64dfbc148 --- /dev/null +++ b/bigtable/internal/transport/monitor.go @@ -0,0 +1,26 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import "context" + +// Monitor defines the interface for background tasks like health checking, +// dynamic scaling, and metrics reporting. +type Monitor interface { + // Start begins the monitor's background processing loop. + Start(ctx context.Context) + // Stop gracefully terminates the monitor's background processing. + Stop() +}