|
40 | 40 | import java.time.Duration; |
41 | 41 | import java.time.Instant; |
42 | 42 | import java.util.BitSet; |
| 43 | +import java.util.EnumSet; |
43 | 44 | import java.util.HashMap; |
44 | 45 | import java.util.Map; |
45 | 46 | import java.util.concurrent.ExecutionException; |
@@ -262,6 +263,9 @@ public void close() { |
262 | 263 | */ |
263 | 264 | private static final Map<SpannerImpl, BitSet> CHANNEL_USAGE = new HashMap<>(); |
264 | 265 |
|
| 266 | + private static final EnumSet<ErrorCode> RETRYABLE_ERROR_CODES = |
| 267 | + EnumSet.of(ErrorCode.DEADLINE_EXCEEDED, ErrorCode.RESOURCE_EXHAUSTED, ErrorCode.UNAVAILABLE); |
| 268 | + |
265 | 269 | private final BitSet channelUsage; |
266 | 270 |
|
267 | 271 | private final int numChannels; |
@@ -358,11 +362,19 @@ public void close() { |
358 | 362 | SettableApiFuture.create(); |
359 | 363 | this.readWriteBeginTransactionReferenceFuture = SettableApiFuture.create(); |
360 | 364 | this.multiplexedSessionReference = new AtomicReference<>(initialSessionReferenceFuture); |
| 365 | + asyncCreateMultiplexedSession(initialSessionReferenceFuture); |
| 366 | + maybeWaitForSessionCreation( |
| 367 | + sessionClient.getSpanner().getOptions().getSessionPoolOptions(), |
| 368 | + initialSessionReferenceFuture); |
| 369 | + } |
| 370 | + |
| 371 | + private void asyncCreateMultiplexedSession( |
| 372 | + SettableApiFuture<SessionReference> sessionReferenceFuture) { |
361 | 373 | this.sessionClient.asyncCreateMultiplexedSession( |
362 | 374 | new SessionConsumer() { |
363 | 375 | @Override |
364 | 376 | public void onSessionReady(SessionImpl session) { |
365 | | - initialSessionReferenceFuture.set(session.getSessionReference()); |
| 377 | + sessionReferenceFuture.set(session.getSessionReference()); |
366 | 378 | // only start the maintainer if we actually managed to create a session in the first |
367 | 379 | // place. |
368 | 380 | maintainer.start(); |
@@ -395,33 +407,62 @@ public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount |
395 | 407 | // Mark multiplexes sessions as unimplemented and fall back to regular sessions if |
396 | 408 | // UNIMPLEMENTED is returned. |
397 | 409 | maybeMarkUnimplemented(t); |
398 | | - initialSessionReferenceFuture.setException(t); |
| 410 | + sessionReferenceFuture.setException(t); |
399 | 411 | } |
400 | 412 | }); |
401 | | - maybeWaitForSessionCreation( |
402 | | - sessionClient.getSpanner().getOptions().getSessionPoolOptions(), |
403 | | - initialSessionReferenceFuture); |
404 | 413 | } |
405 | 414 |
|
406 | 415 | void setPool(SessionPool pool) { |
407 | 416 | this.pool = pool; |
408 | 417 | } |
409 | 418 |
|
410 | | - private static void maybeWaitForSessionCreation( |
411 | | - SessionPoolOptions sessionPoolOptions, ApiFuture<SessionReference> future) { |
| 419 | + private void maybeWaitForSessionCreation( |
| 420 | + SessionPoolOptions sessionPoolOptions, |
| 421 | + SettableApiFuture<SessionReference> initialSessionReferenceFuture) { |
412 | 422 | Duration waitDuration = sessionPoolOptions.getWaitForMinSessions(); |
413 | 423 | if (waitDuration != null && !waitDuration.isZero()) { |
414 | | - long timeoutMillis = waitDuration.toMillis(); |
415 | | - try { |
416 | | - future.get(timeoutMillis, TimeUnit.MILLISECONDS); |
417 | | - } catch (ExecutionException executionException) { |
418 | | - throw SpannerExceptionFactory.asSpannerException(executionException.getCause()); |
419 | | - } catch (InterruptedException interruptedException) { |
420 | | - throw SpannerExceptionFactory.propagateInterrupt(interruptedException); |
421 | | - } catch (TimeoutException timeoutException) { |
422 | | - throw SpannerExceptionFactory.newSpannerException( |
423 | | - ErrorCode.DEADLINE_EXCEEDED, |
424 | | - "Timed out after waiting " + timeoutMillis + "ms for multiplexed session creation"); |
| 424 | + |
| 425 | + SpannerException lastException = null; |
| 426 | + SettableApiFuture<SessionReference> sessionReferenceFuture = initialSessionReferenceFuture; |
| 427 | + Duration remainingTime; |
| 428 | + |
| 429 | + Instant endTime = Instant.now().plus(waitDuration); |
| 430 | + while ((remainingTime = Duration.between(Instant.now(), endTime)).toMillis() > 0) { |
| 431 | + // If any exception is thrown, then retry the multiplexed session creation |
| 432 | + if (sessionReferenceFuture == null) { |
| 433 | + sessionReferenceFuture = SettableApiFuture.create(); |
| 434 | + asyncCreateMultiplexedSession(sessionReferenceFuture); |
| 435 | + this.multiplexedSessionReference.set(sessionReferenceFuture); |
| 436 | + } |
| 437 | + try { |
| 438 | + sessionReferenceFuture.get(remainingTime.toMillis(), TimeUnit.MILLISECONDS); |
| 439 | + lastException = null; |
| 440 | + break; |
| 441 | + } catch (ExecutionException executionException) { |
| 442 | + lastException = SpannerExceptionFactory.asSpannerException(executionException.getCause()); |
| 443 | + } catch (InterruptedException interruptedException) { |
| 444 | + lastException = SpannerExceptionFactory.propagateInterrupt(interruptedException); |
| 445 | + } catch (TimeoutException timeoutException) { |
| 446 | + lastException = |
| 447 | + SpannerExceptionFactory.newSpannerException( |
| 448 | + ErrorCode.DEADLINE_EXCEEDED, |
| 449 | + "Timed out after waiting " |
| 450 | + + waitDuration.toMillis() |
| 451 | + + "ms for multiplexed session creation"); |
| 452 | + } |
| 453 | + // if any exception is thrown, then set the session reference to null to retry the |
| 454 | + // multiplexed session creation only if the error code is DEADLINE EXCEEDED, UNAVAILABLE or |
| 455 | + // RESOURCE_EXHAUSTED |
| 456 | + if (RETRYABLE_ERROR_CODES.contains(lastException.getErrorCode())) { |
| 457 | + sessionReferenceFuture = null; |
| 458 | + } else { |
| 459 | + break; |
| 460 | + } |
| 461 | + } |
| 462 | + // if the wait time elapsed and multiplexed session fetch failed then throw the last exception |
| 463 | + // that we have received |
| 464 | + if (lastException != null) { |
| 465 | + throw lastException; |
425 | 466 | } |
426 | 467 | } |
427 | 468 | } |
|
0 commit comments