Skip to content

Commit 3e3bd2d

Browse files
authored
fix(spanner): replace multiplexed session request loop with shared in-flight creation (#14215)
Fixes: #14212 Reworked multiplexed session initialization to use a shared in-flight creation latch instead of the old request goroutine and unbuffered channels. This fixes the startup deadlock triggered by a cancelled read-only transaction waiter and adds regression coverage for cancellation and client shutdown while session creation is in flight. sessionManager used an unbuffered multiplexedSessionReq channel and a single goroutine to coordinate multiplexed session creation. A cancelled waiter could cause that goroutine to return, leaving future callers blocked forever trying to send on the request channel during takeMultiplexed. Replace that request/ack flow with a shared in-flight creation record on sessionManager. When no multiplexed session exists, the manager starts one background creation with a manager-owned context and exposes a done channel for all waiters in that creation epoch. Waiters now block on done or their own context, and completion is broadcast by closing the shared channel. This removes the deadlock where one cancelled context could kill shared coordination, and it also lets Client.Close release any waiters blocked behind an in-flight initial create.
1 parent 6315105 commit 3e3bd2d

File tree

3 files changed

+167
-108
lines changed

3 files changed

+167
-108
lines changed

spanner/session.go

Lines changed: 93 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -224,9 +224,11 @@ var DefaultSessionPoolConfig = SessionPoolConfig{
224224
MultiplexSessionCheckInterval: 10 * time.Minute,
225225
}
226226

227-
type muxSessionCreateRequest struct {
228-
ctx context.Context
229-
force bool
227+
type multiplexedSessionCreation struct {
228+
done chan struct{}
229+
cancel context.CancelFunc
230+
once sync.Once
231+
err error
230232
}
231233

232234
// sessionManager manages multiplexed sessions for a database.
@@ -235,12 +237,10 @@ type sessionManager struct {
235237
valid bool
236238
sc *sessionClient
237239

238-
multiplexSessionClientCounter int
239-
clientPool []spannerClient
240-
multiplexedSession *session
241-
multiplexedSessionReq chan muxSessionCreateRequest
242-
mayGetMultiplexedSession chan bool
243-
multiplexedSessionCreationError error
240+
multiplexSessionClientCounter int
241+
clientPool []spannerClient
242+
multiplexedSession *session
243+
multiplexedSessionCreation *multiplexedSessionCreation
244244

245245
// locationRouter is set when the experimental location API is enabled.
246246
// It is used to wrap round-robin clients with location-aware routing.
@@ -263,14 +263,12 @@ func newSessionManager(sc *sessionClient, config SessionPoolConfig) (*sessionMan
263263
}
264264

265265
sm := &sessionManager{
266-
sc: sc,
267-
valid: true,
268-
mayGetMultiplexedSession: make(chan bool),
269-
multiplexedSessionReq: make(chan muxSessionCreateRequest),
270-
SessionPoolConfig: config,
271-
rand: rand.New(rand.NewSource(time.Now().UnixNano())),
272-
otConfig: sc.otConfig,
273-
done: make(chan struct{}),
266+
sc: sc,
267+
valid: true,
268+
SessionPoolConfig: config,
269+
rand: rand.New(rand.NewSource(time.Now().UnixNano())),
270+
otConfig: sc.otConfig,
271+
done: make(chan struct{}),
274272
}
275273

276274
_, instance, database, err := parseDatabaseName(sc.database)
@@ -288,21 +286,9 @@ func newSessionManager(sc *sessionClient, config SessionPoolConfig) (*sessionMan
288286
}
289287
sm.tagMap = tag.FromContext(ctx)
290288

291-
// Start the multiplexed session creation goroutine
292-
go sm.createMultiplexedSession()
293-
294-
// Create the initial multiplexed session
295-
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
296-
sm.multiplexedSessionReq <- muxSessionCreateRequest{force: true, ctx: ctx}
297-
go func() {
298-
select {
299-
case <-ctx.Done():
300-
cancel()
301-
return
302-
case <-sm.mayGetMultiplexedSession:
303-
cancel()
304-
}
305-
}()
289+
sm.mu.Lock()
290+
sm.ensureMultiplexedSessionCreationLocked(true)
291+
sm.mu.Unlock()
306292

307293
// Start the multiplexed session refresh worker
308294
go sm.multiplexSessionWorker()
@@ -342,73 +328,72 @@ func (p *sessionManager) recordOTStat(ctx context.Context, m metric.Int64Counter
342328
}
343329
}
344330

345-
func (p *sessionManager) createMultiplexedSession() {
346-
for c := range p.multiplexedSessionReq {
347-
p.mu.Lock()
348-
sess := p.multiplexedSession
349-
p.mu.Unlock()
350-
if c.force || sess == nil {
351-
p.mu.Lock()
352-
p.sc.mu.Lock()
353-
client, err := p.sc.nextClient()
354-
p.sc.mu.Unlock()
355-
p.mu.Unlock()
356-
if err != nil {
357-
p.mu.Lock()
358-
p.multiplexedSessionCreationError = err
359-
p.mu.Unlock()
360-
select {
361-
case p.mayGetMultiplexedSession <- true:
362-
case <-c.ctx.Done():
363-
}
364-
continue
365-
}
366-
p.sc.executeCreateMultiplexedSession(c.ctx, client, p.sc.md, p)
367-
continue
368-
}
369-
select {
370-
case p.mayGetMultiplexedSession <- true:
371-
case <-c.ctx.Done():
372-
return
373-
}
331+
func (p *sessionManager) ensureMultiplexedSessionCreationLocked(force bool) *multiplexedSessionCreation {
332+
if p.multiplexedSessionCreation != nil {
333+
return p.multiplexedSessionCreation
374334
}
335+
if !force && p.multiplexedSession != nil {
336+
return nil
337+
}
338+
ctx, cancel := context.WithCancel(context.Background())
339+
creation := &multiplexedSessionCreation{
340+
done: make(chan struct{}),
341+
cancel: cancel,
342+
}
343+
p.multiplexedSessionCreation = creation
344+
go p.runMultiplexedSessionCreation(ctx, creation)
345+
return creation
375346
}
376347

377-
// sessionReady is called when a session has been created and is ready for use.
378-
func (p *sessionManager) sessionReady(ctx context.Context, s *session) {
348+
func (p *sessionManager) runMultiplexedSessionCreation(ctx context.Context, creation *multiplexedSessionCreation) {
349+
defer creation.cancel()
350+
379351
p.mu.Lock()
380-
defer p.mu.Unlock()
381-
s.sm = p
382-
p.multiplexedSession = s
383-
p.multiplexedSessionCreationError = nil
384-
p.recordStat(context.Background(), OpenSessionCount, int64(1), tag.Tag{Key: tagKeyIsMultiplexed, Value: "true"})
385-
p.recordStat(context.Background(), SessionsCount, 1, tagNumSessions, tag.Tag{Key: tagKeyIsMultiplexed, Value: "true"})
386-
select {
387-
case p.mayGetMultiplexedSession <- true:
388-
case <-ctx.Done():
352+
p.sc.mu.Lock()
353+
client, err := p.sc.nextClient()
354+
p.sc.mu.Unlock()
355+
p.mu.Unlock()
356+
if err != nil {
357+
p.finishMultiplexedSessionCreation(creation, nil, err)
358+
return
389359
}
360+
361+
p.sc.executeCreateMultiplexedSession(ctx, client, p.sc.md, &multiplexedSessionCreationConsumer{
362+
manager: p,
363+
creation: creation,
364+
})
390365
}
391366

392-
// sessionCreationFailed is called when session creation fails.
393-
func (p *sessionManager) sessionCreationFailed(ctx context.Context, err error) {
394-
p.mu.Lock()
395-
defer p.mu.Unlock()
396-
if p.multiplexedSession != nil {
397-
p.multiplexedSessionCreationError = nil
398-
select {
399-
case p.mayGetMultiplexedSession <- true:
400-
case <-ctx.Done():
401-
return
367+
func (p *sessionManager) finishMultiplexedSessionCreation(creation *multiplexedSessionCreation, s *session, err error) {
368+
creation.once.Do(func() {
369+
p.mu.Lock()
370+
if p.multiplexedSessionCreation == creation {
371+
p.multiplexedSessionCreation = nil
372+
if p.valid && s != nil {
373+
s.sm = p
374+
p.multiplexedSession = s
375+
p.recordStat(context.Background(), OpenSessionCount, int64(1), tag.Tag{Key: tagKeyIsMultiplexed, Value: "true"})
376+
p.recordStat(context.Background(), SessionsCount, 1, tagNumSessions, tag.Tag{Key: tagKeyIsMultiplexed, Value: "true"})
377+
}
402378
}
403-
return
404-
}
405-
p.recordStat(context.Background(), OpenSessionCount, int64(0), tag.Tag{Key: tagKeyIsMultiplexed, Value: "true"})
406-
p.multiplexedSessionCreationError = err
407-
select {
408-
case p.mayGetMultiplexedSession <- true:
409-
case <-ctx.Done():
410-
return
411-
}
379+
p.mu.Unlock()
380+
381+
creation.err = err
382+
close(creation.done)
383+
})
384+
}
385+
386+
type multiplexedSessionCreationConsumer struct {
387+
manager *sessionManager
388+
creation *multiplexedSessionCreation
389+
}
390+
391+
func (c *multiplexedSessionCreationConsumer) sessionReady(_ context.Context, s *session) {
392+
c.manager.finishMultiplexedSessionCreation(c.creation, s, nil)
393+
}
394+
395+
func (c *multiplexedSessionCreationConsumer) sessionCreationFailed(_ context.Context, err error) {
396+
c.manager.finishMultiplexedSessionCreation(c.creation, nil, err)
412397
}
413398

414399
// isValid checks if the session pool is still valid.
@@ -438,9 +423,14 @@ func (p *sessionManager) close(ctx context.Context) {
438423
logf(p.sc.logger, "Failed to unregister callback from the OpenTelemetry meter, error : %v", err)
439424
}
440425
}
441-
p.mu.Unlock()
442426
p.once.Do(func() { close(p.done) })
443-
close(p.multiplexedSessionReq)
427+
creation := p.multiplexedSessionCreation
428+
p.multiplexedSessionCreation = nil
429+
p.mu.Unlock()
430+
if creation != nil {
431+
creation.cancel()
432+
p.finishMultiplexedSessionCreation(creation, nil, errInvalidSession)
433+
}
444434
}
445435

446436
// errInvalidSession is the error for using an invalid session.
@@ -495,6 +485,7 @@ func (p *sessionManager) takeMultiplexed(ctx context.Context) (*sessionHandle, e
495485
trace.TracePrintf(ctx, nil, "Acquiring a multiplexed session")
496486
for {
497487
var s *session
488+
var creation *multiplexedSessionCreation
498489
p.mu.Lock()
499490
if !p.valid {
500491
p.mu.Unlock()
@@ -508,9 +499,8 @@ func (p *sessionManager) takeMultiplexed(ctx context.Context) (*sessionHandle, e
508499
p.incNumMultiplexedInUse(ctx)
509500
return p.newSessionHandle(s), nil
510501
}
511-
mayGetSession := p.mayGetMultiplexedSession
502+
creation = p.ensureMultiplexedSessionCreationLocked(false)
512503
p.mu.Unlock()
513-
p.multiplexedSessionReq <- muxSessionCreateRequest{force: false, ctx: ctx}
514504
select {
515505
case <-ctx.Done():
516506
trace.TracePrintf(ctx, nil, "Context done waiting for multiplexed session")
@@ -519,15 +509,11 @@ func (p *sessionManager) takeMultiplexed(ctx context.Context) (*sessionHandle, e
519509
p.recordOTStat(ctx, p.otConfig.getSessionTimeoutsCount, 1, recordOTStatOption{attr: p.otConfig.attributeMapWithMultiplexed})
520510
}
521511
return nil, p.errGetSessionTimeout(ctx)
522-
case <-mayGetSession:
523-
p.mu.Lock()
524-
if p.multiplexedSessionCreationError != nil {
525-
trace.TracePrintf(ctx, nil, "Error creating multiplexed session: %v", p.multiplexedSessionCreationError)
526-
err := p.multiplexedSessionCreationError
527-
p.mu.Unlock()
528-
return nil, err
512+
case <-creation.done:
513+
if creation.err != nil {
514+
trace.TracePrintf(ctx, nil, "Error creating multiplexed session: %v", creation.err)
515+
return nil, creation.err
529516
}
530-
p.mu.Unlock()
531517
}
532518
}
533519
}
@@ -562,19 +548,18 @@ func (p *sessionManager) multiplexSessionWorker() {
562548
}
563549
p.mu.Unlock()
564550

565-
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
566551
if createTime.Add(multiplexSessionRefreshInterval).Before(time.Now()) {
567552
// Multiplexed session is idle for more than 7 days, replace it.
568-
p.multiplexedSessionReq <- muxSessionCreateRequest{force: true, ctx: ctx}
553+
p.mu.Lock()
554+
creation := p.ensureMultiplexedSessionCreationLocked(true)
555+
p.mu.Unlock()
569556
// wait for the new multiplexed session to be created.
570557
select {
571-
case <-p.mayGetMultiplexedSession:
558+
case <-creation.done:
572559
case <-p.done:
573-
cancel()
574560
return
575561
}
576562
}
577-
cancel()
578563

579564
// Sleep for a while to avoid burning CPU.
580565
select {

spanner/session_test.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package spanner
1818

1919
import (
2020
"context"
21+
"fmt"
2122
"testing"
2223
"time"
2324

@@ -112,6 +113,78 @@ func TestMultiplexSessionWorker(t *testing.T) {
112113
}
113114
}
114115

116+
func TestMultiplexedSessionCreationWithInterleavedRequests(t *testing.T) {
117+
t.Parallel()
118+
119+
// Delay CreateSession so the initial multiplexed session creation stays
120+
// in flight while multiple readers arrive.
121+
server, opts, serverTeardown := NewMockedSpannerInMemTestServer(t)
122+
defer serverTeardown()
123+
server.TestSpanner.PutExecutionTime(MethodCreateSession,
124+
SimulatedExecutionTime{MinimumExecutionTime: 500 * time.Millisecond})
125+
126+
ctx := context.Background()
127+
db := fmt.Sprintf("projects/%s/instances/%s/databases/%s", "[PROJECT]", "[INSTANCE]", "[DATABASE]")
128+
client, err := NewClientWithConfig(ctx, db, ClientConfig{
129+
DisableNativeMetrics: true,
130+
}, opts...)
131+
if err != nil {
132+
t.Fatal(err)
133+
}
134+
defer client.Close()
135+
136+
// Start a read with an already-cancelled context while the initial
137+
// multiplexed session creation is still in flight. This waiter should not
138+
// prevent a concurrent valid read from succeeding once the session becomes
139+
// available.
140+
cancelledCtx, cancel := context.WithCancel(ctx)
141+
cancel()
142+
go client.Single().ReadRow(cancelledCtx, "Albums", Key{"foo"}, []string{"SingerId", "AlbumId", "AlbumTitle"})
143+
144+
// Give the cancelled read a brief head start so it begins waiting before the
145+
// valid read below.
146+
time.Sleep(50 * time.Millisecond)
147+
148+
// Start a second read with a valid context while the initial creation is
149+
// still in flight. This read must complete successfully.
150+
done := make(chan error, 1)
151+
go func() {
152+
defer func() {
153+
if r := recover(); r != nil {
154+
// Surface unexpected panics from the read goroutine as test failures.
155+
done <- fmt.Errorf("panic: %v", r)
156+
}
157+
}()
158+
readCtx, readCancel := context.WithTimeout(ctx, 15*time.Second)
159+
defer readCancel()
160+
_, err := client.Single().ReadRow(readCtx, "Albums", Key{"foo"}, []string{"SingerId", "AlbumId", "AlbumTitle"})
161+
done <- err
162+
}()
163+
164+
// Remove the artificial delay after the initial multiplexed session is ready
165+
// so any subsequent session creation is not slowed down.
166+
waitFor(t, func() error {
167+
client.sm.mu.Lock()
168+
defer client.sm.mu.Unlock()
169+
if client.sm.multiplexedSession == nil {
170+
return errInvalidSession
171+
}
172+
return nil
173+
})
174+
server.TestSpanner.Freeze()
175+
server.TestSpanner.PutExecutionTime(MethodCreateSession, SimulatedExecutionTime{})
176+
server.TestSpanner.Unfreeze()
177+
178+
select {
179+
case err := <-done:
180+
if err != nil {
181+
t.Fatalf("ReadRow returned unexpected error: %v", err)
182+
}
183+
case <-time.After(5 * time.Second):
184+
t.Fatal("ReadRow deadlocked with interleaved cancelled and valid requests during initial multiplexed session creation")
185+
}
186+
}
187+
115188
func waitFor(t *testing.T, assert func() error) {
116189
t.Helper()
117190
timeout := 15 * time.Second

spanner/sessionclient.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,7 @@ func (sc *sessionClient) executeCreateMultiplexedSession(ctx context.Context, cl
209209
if closed {
210210
err := spannerErrorf(codes.Canceled, "Session client closed")
211211
trace.TracePrintf(ctx, nil, "Session client closed while creating a multiplexed session: %v", err)
212+
consumer.sessionCreationFailed(ctx, err)
212213
return
213214
}
214215
if ctx.Err() != nil {

0 commit comments

Comments
 (0)