Skip to content

Commit 735e29e

Browse files
fix: Retry creation of multiplexed session (#4288)
* fix(spanner): Retry creation of multiplexed session * Reduce wait time to run tests faster
1 parent 90bfabf commit 735e29e

File tree

3 files changed

+328
-22
lines changed

3 files changed

+328
-22
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java

Lines changed: 59 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import java.time.Duration;
4141
import java.time.Instant;
4242
import java.util.BitSet;
43+
import java.util.EnumSet;
4344
import java.util.HashMap;
4445
import java.util.Map;
4546
import java.util.concurrent.ExecutionException;
@@ -262,6 +263,9 @@ public void close() {
262263
*/
263264
private static final Map<SpannerImpl, BitSet> CHANNEL_USAGE = new HashMap<>();
264265

266+
private static final EnumSet<ErrorCode> RETRYABLE_ERROR_CODES =
267+
EnumSet.of(ErrorCode.DEADLINE_EXCEEDED, ErrorCode.RESOURCE_EXHAUSTED, ErrorCode.UNAVAILABLE);
268+
265269
private final BitSet channelUsage;
266270

267271
private final int numChannels;
@@ -358,11 +362,19 @@ public void close() {
358362
SettableApiFuture.create();
359363
this.readWriteBeginTransactionReferenceFuture = SettableApiFuture.create();
360364
this.multiplexedSessionReference = new AtomicReference<>(initialSessionReferenceFuture);
365+
asyncCreateMultiplexedSession(initialSessionReferenceFuture);
366+
maybeWaitForSessionCreation(
367+
sessionClient.getSpanner().getOptions().getSessionPoolOptions(),
368+
initialSessionReferenceFuture);
369+
}
370+
371+
private void asyncCreateMultiplexedSession(
372+
SettableApiFuture<SessionReference> sessionReferenceFuture) {
361373
this.sessionClient.asyncCreateMultiplexedSession(
362374
new SessionConsumer() {
363375
@Override
364376
public void onSessionReady(SessionImpl session) {
365-
initialSessionReferenceFuture.set(session.getSessionReference());
377+
sessionReferenceFuture.set(session.getSessionReference());
366378
// only start the maintainer if we actually managed to create a session in the first
367379
// place.
368380
maintainer.start();
@@ -395,33 +407,62 @@ public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount
395407
// Mark multiplexes sessions as unimplemented and fall back to regular sessions if
396408
// UNIMPLEMENTED is returned.
397409
maybeMarkUnimplemented(t);
398-
initialSessionReferenceFuture.setException(t);
410+
sessionReferenceFuture.setException(t);
399411
}
400412
});
401-
maybeWaitForSessionCreation(
402-
sessionClient.getSpanner().getOptions().getSessionPoolOptions(),
403-
initialSessionReferenceFuture);
404413
}
405414

406415
void setPool(SessionPool pool) {
407416
this.pool = pool;
408417
}
409418

410-
private static void maybeWaitForSessionCreation(
411-
SessionPoolOptions sessionPoolOptions, ApiFuture<SessionReference> future) {
419+
private void maybeWaitForSessionCreation(
420+
SessionPoolOptions sessionPoolOptions,
421+
SettableApiFuture<SessionReference> initialSessionReferenceFuture) {
412422
Duration waitDuration = sessionPoolOptions.getWaitForMinSessions();
413423
if (waitDuration != null && !waitDuration.isZero()) {
414-
long timeoutMillis = waitDuration.toMillis();
415-
try {
416-
future.get(timeoutMillis, TimeUnit.MILLISECONDS);
417-
} catch (ExecutionException executionException) {
418-
throw SpannerExceptionFactory.asSpannerException(executionException.getCause());
419-
} catch (InterruptedException interruptedException) {
420-
throw SpannerExceptionFactory.propagateInterrupt(interruptedException);
421-
} catch (TimeoutException timeoutException) {
422-
throw SpannerExceptionFactory.newSpannerException(
423-
ErrorCode.DEADLINE_EXCEEDED,
424-
"Timed out after waiting " + timeoutMillis + "ms for multiplexed session creation");
424+
425+
SpannerException lastException = null;
426+
SettableApiFuture<SessionReference> sessionReferenceFuture = initialSessionReferenceFuture;
427+
Duration remainingTime;
428+
429+
Instant endTime = Instant.now().plus(waitDuration);
430+
while ((remainingTime = Duration.between(Instant.now(), endTime)).toMillis() > 0) {
431+
// If any exception is thrown, then retry the multiplexed session creation
432+
if (sessionReferenceFuture == null) {
433+
sessionReferenceFuture = SettableApiFuture.create();
434+
asyncCreateMultiplexedSession(sessionReferenceFuture);
435+
this.multiplexedSessionReference.set(sessionReferenceFuture);
436+
}
437+
try {
438+
sessionReferenceFuture.get(remainingTime.toMillis(), TimeUnit.MILLISECONDS);
439+
lastException = null;
440+
break;
441+
} catch (ExecutionException executionException) {
442+
lastException = SpannerExceptionFactory.asSpannerException(executionException.getCause());
443+
} catch (InterruptedException interruptedException) {
444+
lastException = SpannerExceptionFactory.propagateInterrupt(interruptedException);
445+
} catch (TimeoutException timeoutException) {
446+
lastException =
447+
SpannerExceptionFactory.newSpannerException(
448+
ErrorCode.DEADLINE_EXCEEDED,
449+
"Timed out after waiting "
450+
+ waitDuration.toMillis()
451+
+ "ms for multiplexed session creation");
452+
}
453+
// if any exception is thrown, then set the session reference to null to retry the
454+
// multiplexed session creation only if the error code is DEADLINE EXCEEDED, UNAVAILABLE or
455+
// RESOURCE_EXHAUSTED
456+
if (RETRYABLE_ERROR_CODES.contains(lastException.getErrorCode())) {
457+
sessionReferenceFuture = null;
458+
} else {
459+
break;
460+
}
461+
}
462+
// if the wait time elapsed and multiplexed session fetch failed then throw the last exception
463+
// that we have received
464+
if (lastException != null) {
465+
throw lastException;
425466
}
426467
}
427468
}

google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -542,16 +542,16 @@ void simulateExecutionTime(
542542
boolean stickyGlobalExceptions,
543543
CountDownLatch freezeLock) {
544544
Uninterruptibles.awaitUninterruptibly(freezeLock);
545-
checkException(globalExceptions, stickyGlobalExceptions);
546-
if (streamIndices.isEmpty()) {
547-
checkException(this.exceptions, stickyException);
548-
}
549545
if (minimumExecutionTime > 0 || randomExecutionTime > 0) {
550546
Uninterruptibles.sleepUninterruptibly(
551547
(randomExecutionTime == 0 ? 0 : RANDOM.nextInt(randomExecutionTime))
552548
+ minimumExecutionTime,
553549
TimeUnit.MILLISECONDS);
554550
}
551+
checkException(globalExceptions, stickyGlobalExceptions);
552+
if (streamIndices.isEmpty()) {
553+
checkException(this.exceptions, stickyException);
554+
}
555555
}
556556

557557
private static void checkException(Queue<Exception> exceptions, boolean keepException) {

0 commit comments

Comments
 (0)