@@ -24,6 +24,7 @@ import (
2424 "math/rand"
2525 "net/url"
2626 "slices"
27+ "sort"
2728 "sync"
2829 "sync/atomic"
2930 "time"
@@ -95,6 +96,7 @@ var _ gtransport.ConnPool = &BigtableChannelPool{}
9596type BigtableConn struct {
9697 * grpc.ClientConn
9798 isALTSConn atomic.Bool
99+ createdAt atomic.Int64
98100}
99101
100102// Prime sends a PingAndWarm request to warm up the connection.
@@ -131,9 +133,17 @@ func (bc *BigtableConn) Prime(ctx context.Context, fullInstanceName, appProfileI
131133
132134// NewBigtableConn creates a wrapped grpc Client Conn
133135func NewBigtableConn (conn * grpc.ClientConn ) * BigtableConn {
134- return & BigtableConn {
136+ bc := & BigtableConn {
135137 ClientConn : conn ,
136138 }
139+ bc .createdAt .Store (time .Now ().UnixMilli ())
140+ return bc
141+
142+ }
143+
144+ // createdAt returns the creation time of the connection in int64. milliseconds since epoch
145+ func (bc * BigtableConn ) creationTime () int64 {
146+ return bc .createdAt .Load ()
137147}
138148
139149// connEntry represents a single connection in the pool.
@@ -155,6 +165,15 @@ func (e *connEntry) isALTSUsed() bool {
155165 return e .conn .isALTSConn .Load ()
156166}
157167
168+ // createdAt returns the creation time of the connection in the entry.
169+ // It returns the zero if conn is nil.
170+ func (e * connEntry ) createdAt () int64 {
171+ if e .conn == nil {
172+ return 0
173+ }
174+ return e .conn .creationTime ()
175+ }
176+
158177// isDraining atomically checks if the connection is in the draining state.
159178func (e * connEntry ) isDraining () bool {
160179 return e .drainingState .Load ()
@@ -584,6 +603,163 @@ func (s *refCountedStream) decrementLoad() {
584603 })
585604}
586605
606+ // addConnections returns true if the pool size changed.
607+ // TODO: addConnections has a long section where we dial and prime the connections.
608+ // Currently, we are taking dialMu() throughout the section and dialMu() is also required for
609+ // replaceConnection().
610+ //
611+ // Note that DynamicScaleMonitor allows only one evaluateAndScale as it takes a mutex
612+ // during evaluateAndScale so don't expect any size changes in conns
613+ func (p * BigtableChannelPool ) addConnections (increaseDelta , maxConns int ) bool {
614+ // dialMu access
615+ p .dialMu .Lock ()
616+ defer p .dialMu .Unlock ()
617+ numCurrent := p .Num ()
618+ currentConns := p .getConns ()
619+ maxDelta := maxConns - numCurrent
620+ cappedIncrease := min (increaseDelta , maxDelta )
621+
622+ if cappedIncrease <= 0 {
623+ return false
624+ }
625+
626+ // LONG SECTION<START>
627+ // This section can take time as it involves creating conn and Prime()
628+ // TODO(): Avoid taking dialMu here.
629+ results := make (chan * connEntry , cappedIncrease )
630+ var wg sync.WaitGroup
631+
632+ for i := 0 ; i < cappedIncrease ; i ++ {
633+ wg .Add (1 )
634+ go func () {
635+ defer wg .Done ()
636+
637+ select {
638+ case <- p .poolCtx .Done ():
639+ btopt .Debugf (p .logger , "bigtable_connpool: Context done, skipping connection creation: %v\n " , p .poolCtx .Err ())
640+ return
641+ default :
642+ }
643+
644+ conn , err := p .dial ()
645+ if err != nil {
646+ btopt .Debugf (p .logger , "bigtable_connpool: Failed to dial new connection for scale up: %v\n " , err )
647+ return
648+ }
649+
650+ err = conn .Prime (p .poolCtx , p .instanceName , p .appProfile , p .featureFlagsMD )
651+ if err != nil {
652+ btopt .Debugf (p .logger , "bigtable_connpool: Failed to prime new connection: %v. Connection will not be added.\n " , err )
653+ conn .Close ()
654+ return
655+ }
656+
657+ results <- & connEntry {conn : conn }
658+ }()
659+ }
660+ // Goroutine to close the results channel once all workers are done.
661+ go func () {
662+ wg .Wait ()
663+ close (results )
664+ }()
665+
666+ newEntries := make ([]* connEntry , 0 , cappedIncrease )
667+ for entry := range results {
668+ newEntries = append (newEntries , entry )
669+ }
670+
671+ if len (newEntries ) == 0 {
672+ btopt .Debugf (p .logger , "bigtable_connpool: No new connections were successfully created and primed.\n " )
673+ return false
674+ }
675+
676+ // LONG SECTION<END>
677+
678+ // add now
679+ combinedConns := make ([]* connEntry , numCurrent + len (newEntries ))
680+ copy (combinedConns , currentConns )
681+ copy (combinedConns [numCurrent :], newEntries )
682+ p .conns .Store (& combinedConns )
683+
684+ btopt .Debugf (p .logger , "bigtable_connpool: Added %d connections, new size: %d\n " , numCurrent + len (newEntries ), len (combinedConns ))
685+ return true
686+ }
687+
688+ type entryWithAge struct {
689+ entry * connEntry
690+ createdAt int64
691+ }
692+
693+ // removeConnections returns true if the pool size changed. It removes the oldest connections available in the conns.
694+ func (p * BigtableChannelPool ) removeConnections (decreaseDelta , minConns , maxRemoveConns int ) bool {
695+ // the critical section is very short
696+ // as we just need to sort the conns and get rid of n old connections.
697+ p .dialMu .Lock ()
698+
699+ if decreaseDelta <= 0 {
700+ p .dialMu .Unlock ()
701+ return false
702+ }
703+ snapshotConns := p .getConns ()
704+ numSnapshot := len (snapshotConns )
705+
706+ if numSnapshot <= minConns {
707+ p .dialMu .Unlock ()
708+ btopt .Debugf (p .logger , "bigtable_connpool: Removal skippped, current size %d <= minConns %d\n " , numSnapshot , minConns )
709+ return false
710+ }
711+
712+ // the max we can decrease is min(maxRemoveConns, min(decreaseDelta, numSnapshot - minConns))
713+ cappedDecrease := min (maxRemoveConns , min (decreaseDelta , numSnapshot - minConns ))
714+
715+ if cappedDecrease <= 0 {
716+ p .dialMu .Unlock ()
717+ return false
718+ }
719+
720+ entries := make ([]entryWithAge , numSnapshot )
721+ for i , entry := range snapshotConns {
722+ // Only consider connections not *already* draining for removal via this logic.
723+ if ! entry .isDraining () {
724+ entries [i ] = entryWithAge {entry : entry , createdAt : entry .conn .creationTime ()}
725+ }
726+ }
727+
728+ // Sort by creation time, oldest first
729+ sort .Slice (entries , func (i , j int ) bool {
730+ return entries [i ].entry .createdAt () < entries [j ].entry .createdAt ()
731+ })
732+
733+ // Select the oldest non-draining connections to mark for draining.
734+ connsToDrain := make ([]* connEntry , 0 , cappedDecrease )
735+ for i := 0 ; i < cappedDecrease ; i ++ {
736+ connsToDrain = append (connsToDrain , entries [i ].entry )
737+ entries [i ].entry .markAsDraining ()
738+ }
739+
740+ // Build the slice of connections to keep
741+ // maintains all connections from the snapshot EXCEPT the ones we just
742+ // explicitly marked for removal/draining in this method.
743+ connsToKeep := make ([]* connEntry , 0 , numSnapshot - cappedDecrease )
744+ for _ , entry := range snapshotConns {
745+ if ! entry .isDraining () {
746+ connsToKeep = append (connsToKeep , entry )
747+ }
748+ }
749+
750+ p .conns .Store (& connsToKeep ) // new slice
751+ // Release the lock
752+ p .dialMu .Unlock ()
753+
754+ btopt .Debugf (p .logger , "bigtable_connpool: Marked %d oldest connections for draining, new pool size: %d\n " , len (connsToDrain ), len (connsToKeep ))
755+ // Initiate graceful shutdown for the connections in connsToDrain.
756+ for _ , entry := range connsToDrain {
757+ go p .waitForDrainAndClose (entry )
758+ }
759+ return len (connsToDrain ) > 0
760+
761+ }
762+
587763type multiError []error
588764
589765func (m multiError ) Error () string {
0 commit comments