@@ -224,9 +224,11 @@ var DefaultSessionPoolConfig = SessionPoolConfig{
224224 MultiplexSessionCheckInterval : 10 * time .Minute ,
225225}
226226
227- type muxSessionCreateRequest struct {
228- ctx context.Context
229- force bool
227+ type multiplexedSessionCreation struct {
228+ done chan struct {}
229+ cancel context.CancelFunc
230+ once sync.Once
231+ err error
230232}
231233
232234// sessionManager manages multiplexed sessions for a database.
@@ -235,12 +237,10 @@ type sessionManager struct {
235237 valid bool
236238 sc * sessionClient
237239
238- multiplexSessionClientCounter int
239- clientPool []spannerClient
240- multiplexedSession * session
241- multiplexedSessionReq chan muxSessionCreateRequest
242- mayGetMultiplexedSession chan bool
243- multiplexedSessionCreationError error
240+ multiplexSessionClientCounter int
241+ clientPool []spannerClient
242+ multiplexedSession * session
243+ multiplexedSessionCreation * multiplexedSessionCreation
244244
245245 // locationRouter is set when the experimental location API is enabled.
246246 // It is used to wrap round-robin clients with location-aware routing.
@@ -263,14 +263,12 @@ func newSessionManager(sc *sessionClient, config SessionPoolConfig) (*sessionMan
263263 }
264264
265265 sm := & sessionManager {
266- sc : sc ,
267- valid : true ,
268- mayGetMultiplexedSession : make (chan bool ),
269- multiplexedSessionReq : make (chan muxSessionCreateRequest ),
270- SessionPoolConfig : config ,
271- rand : rand .New (rand .NewSource (time .Now ().UnixNano ())),
272- otConfig : sc .otConfig ,
273- done : make (chan struct {}),
266+ sc : sc ,
267+ valid : true ,
268+ SessionPoolConfig : config ,
269+ rand : rand .New (rand .NewSource (time .Now ().UnixNano ())),
270+ otConfig : sc .otConfig ,
271+ done : make (chan struct {}),
274272 }
275273
276274 _ , instance , database , err := parseDatabaseName (sc .database )
@@ -288,21 +286,9 @@ func newSessionManager(sc *sessionClient, config SessionPoolConfig) (*sessionMan
288286 }
289287 sm .tagMap = tag .FromContext (ctx )
290288
291- // Start the multiplexed session creation goroutine
292- go sm .createMultiplexedSession ()
293-
294- // Create the initial multiplexed session
295- ctx , cancel := context .WithTimeout (context .Background (), 30 * time .Second )
296- sm .multiplexedSessionReq <- muxSessionCreateRequest {force : true , ctx : ctx }
297- go func () {
298- select {
299- case <- ctx .Done ():
300- cancel ()
301- return
302- case <- sm .mayGetMultiplexedSession :
303- cancel ()
304- }
305- }()
289+ sm .mu .Lock ()
290+ sm .ensureMultiplexedSessionCreationLocked (true )
291+ sm .mu .Unlock ()
306292
307293 // Start the multiplexed session refresh worker
308294 go sm .multiplexSessionWorker ()
@@ -342,73 +328,72 @@ func (p *sessionManager) recordOTStat(ctx context.Context, m metric.Int64Counter
342328 }
343329}
344330
345- func (p * sessionManager ) createMultiplexedSession () {
346- for c := range p .multiplexedSessionReq {
347- p .mu .Lock ()
348- sess := p .multiplexedSession
349- p .mu .Unlock ()
350- if c .force || sess == nil {
351- p .mu .Lock ()
352- p .sc .mu .Lock ()
353- client , err := p .sc .nextClient ()
354- p .sc .mu .Unlock ()
355- p .mu .Unlock ()
356- if err != nil {
357- p .mu .Lock ()
358- p .multiplexedSessionCreationError = err
359- p .mu .Unlock ()
360- select {
361- case p .mayGetMultiplexedSession <- true :
362- case <- c .ctx .Done ():
363- }
364- continue
365- }
366- p .sc .executeCreateMultiplexedSession (c .ctx , client , p .sc .md , p )
367- continue
368- }
369- select {
370- case p .mayGetMultiplexedSession <- true :
371- case <- c .ctx .Done ():
372- return
373- }
331+ func (p * sessionManager ) ensureMultiplexedSessionCreationLocked (force bool ) * multiplexedSessionCreation {
332+ if p .multiplexedSessionCreation != nil {
333+ return p .multiplexedSessionCreation
374334 }
335+ if ! force && p .multiplexedSession != nil {
336+ return nil
337+ }
338+ ctx , cancel := context .WithCancel (context .Background ())
339+ creation := & multiplexedSessionCreation {
340+ done : make (chan struct {}),
341+ cancel : cancel ,
342+ }
343+ p .multiplexedSessionCreation = creation
344+ go p .runMultiplexedSessionCreation (ctx , creation )
345+ return creation
375346}
376347
377- // sessionReady is called when a session has been created and is ready for use.
378- func (p * sessionManager ) sessionReady (ctx context.Context , s * session ) {
348+ func (p * sessionManager ) runMultiplexedSessionCreation (ctx context.Context , creation * multiplexedSessionCreation ) {
349+ defer creation .cancel ()
350+
379351 p .mu .Lock ()
380- defer p .mu .Unlock ()
381- s .sm = p
382- p .multiplexedSession = s
383- p .multiplexedSessionCreationError = nil
384- p .recordStat (context .Background (), OpenSessionCount , int64 (1 ), tag.Tag {Key : tagKeyIsMultiplexed , Value : "true" })
385- p .recordStat (context .Background (), SessionsCount , 1 , tagNumSessions , tag.Tag {Key : tagKeyIsMultiplexed , Value : "true" })
386- select {
387- case p .mayGetMultiplexedSession <- true :
388- case <- ctx .Done ():
352+ p .sc .mu .Lock ()
353+ client , err := p .sc .nextClient ()
354+ p .sc .mu .Unlock ()
355+ p .mu .Unlock ()
356+ if err != nil {
357+ p .finishMultiplexedSessionCreation (creation , nil , err )
358+ return
389359 }
360+
361+ p .sc .executeCreateMultiplexedSession (ctx , client , p .sc .md , & multiplexedSessionCreationConsumer {
362+ manager : p ,
363+ creation : creation ,
364+ })
390365}
391366
392- // sessionCreationFailed is called when session creation fails.
393- func (p * sessionManager ) sessionCreationFailed (ctx context.Context , err error ) {
394- p .mu .Lock ()
395- defer p .mu .Unlock ()
396- if p .multiplexedSession != nil {
397- p .multiplexedSessionCreationError = nil
398- select {
399- case p .mayGetMultiplexedSession <- true :
400- case <- ctx .Done ():
401- return
367+ func (p * sessionManager ) finishMultiplexedSessionCreation (creation * multiplexedSessionCreation , s * session , err error ) {
368+ creation .once .Do (func () {
369+ p .mu .Lock ()
370+ if p .multiplexedSessionCreation == creation {
371+ p .multiplexedSessionCreation = nil
372+ if p .valid && s != nil {
373+ s .sm = p
374+ p .multiplexedSession = s
375+ p .recordStat (context .Background (), OpenSessionCount , int64 (1 ), tag.Tag {Key : tagKeyIsMultiplexed , Value : "true" })
376+ p .recordStat (context .Background (), SessionsCount , 1 , tagNumSessions , tag.Tag {Key : tagKeyIsMultiplexed , Value : "true" })
377+ }
402378 }
403- return
404- }
405- p .recordStat (context .Background (), OpenSessionCount , int64 (0 ), tag.Tag {Key : tagKeyIsMultiplexed , Value : "true" })
406- p .multiplexedSessionCreationError = err
407- select {
408- case p .mayGetMultiplexedSession <- true :
409- case <- ctx .Done ():
410- return
411- }
379+ p .mu .Unlock ()
380+
381+ creation .err = err
382+ close (creation .done )
383+ })
384+ }
385+
386+ type multiplexedSessionCreationConsumer struct {
387+ manager * sessionManager
388+ creation * multiplexedSessionCreation
389+ }
390+
391+ func (c * multiplexedSessionCreationConsumer ) sessionReady (_ context.Context , s * session ) {
392+ c .manager .finishMultiplexedSessionCreation (c .creation , s , nil )
393+ }
394+
395+ func (c * multiplexedSessionCreationConsumer ) sessionCreationFailed (_ context.Context , err error ) {
396+ c .manager .finishMultiplexedSessionCreation (c .creation , nil , err )
412397}
413398
414399// isValid checks if the session pool is still valid.
@@ -438,9 +423,14 @@ func (p *sessionManager) close(ctx context.Context) {
438423 logf (p .sc .logger , "Failed to unregister callback from the OpenTelemetry meter, error : %v" , err )
439424 }
440425 }
441- p .mu .Unlock ()
442426 p .once .Do (func () { close (p .done ) })
443- close (p .multiplexedSessionReq )
427+ creation := p .multiplexedSessionCreation
428+ p .multiplexedSessionCreation = nil
429+ p .mu .Unlock ()
430+ if creation != nil {
431+ creation .cancel ()
432+ p .finishMultiplexedSessionCreation (creation , nil , errInvalidSession )
433+ }
444434}
445435
446436// errInvalidSession is the error for using an invalid session.
@@ -495,6 +485,7 @@ func (p *sessionManager) takeMultiplexed(ctx context.Context) (*sessionHandle, e
495485 trace .TracePrintf (ctx , nil , "Acquiring a multiplexed session" )
496486 for {
497487 var s * session
488+ var creation * multiplexedSessionCreation
498489 p .mu .Lock ()
499490 if ! p .valid {
500491 p .mu .Unlock ()
@@ -508,9 +499,8 @@ func (p *sessionManager) takeMultiplexed(ctx context.Context) (*sessionHandle, e
508499 p .incNumMultiplexedInUse (ctx )
509500 return p .newSessionHandle (s ), nil
510501 }
511- mayGetSession : = p .mayGetMultiplexedSession
502+ creation = p .ensureMultiplexedSessionCreationLocked ( false )
512503 p .mu .Unlock ()
513- p .multiplexedSessionReq <- muxSessionCreateRequest {force : false , ctx : ctx }
514504 select {
515505 case <- ctx .Done ():
516506 trace .TracePrintf (ctx , nil , "Context done waiting for multiplexed session" )
@@ -519,15 +509,11 @@ func (p *sessionManager) takeMultiplexed(ctx context.Context) (*sessionHandle, e
519509 p .recordOTStat (ctx , p .otConfig .getSessionTimeoutsCount , 1 , recordOTStatOption {attr : p .otConfig .attributeMapWithMultiplexed })
520510 }
521511 return nil , p .errGetSessionTimeout (ctx )
522- case <- mayGetSession :
523- p .mu .Lock ()
524- if p .multiplexedSessionCreationError != nil {
525- trace .TracePrintf (ctx , nil , "Error creating multiplexed session: %v" , p .multiplexedSessionCreationError )
526- err := p .multiplexedSessionCreationError
527- p .mu .Unlock ()
528- return nil , err
512+ case <- creation .done :
513+ if creation .err != nil {
514+ trace .TracePrintf (ctx , nil , "Error creating multiplexed session: %v" , creation .err )
515+ return nil , creation .err
529516 }
530- p .mu .Unlock ()
531517 }
532518 }
533519}
@@ -562,19 +548,18 @@ func (p *sessionManager) multiplexSessionWorker() {
562548 }
563549 p .mu .Unlock ()
564550
565- ctx , cancel := context .WithTimeout (context .Background (), 30 * time .Second )
566551 if createTime .Add (multiplexSessionRefreshInterval ).Before (time .Now ()) {
567552 // Multiplexed session is idle for more than 7 days, replace it.
568- p .multiplexedSessionReq <- muxSessionCreateRequest {force : true , ctx : ctx }
553+ p .mu .Lock ()
554+ creation := p .ensureMultiplexedSessionCreationLocked (true )
555+ p .mu .Unlock ()
569556 // wait for the new multiplexed session to be created.
570557 select {
571- case <- p . mayGetMultiplexedSession :
558+ case <- creation . done :
572559 case <- p .done :
573- cancel ()
574560 return
575561 }
576562 }
577- cancel ()
578563
579564 // Sleep for a while to avoid burning CPU.
580565 select {
0 commit comments