Skip to content
This repository was archived by the owner on Feb 24, 2026. It is now read-only.

Commit 3baa84e

Browse files
authored
fix: A stuck when the client fail to get DoneCallback (#1637)
Add a timeout of one minute waiting for done callback to be called. Same timeout as client close. The donecallback mainly gives back the server side error status, so it is not critical. In Dataflow connector, we saw hang because the DoneCallback is lost and we wait forever on it. Stack trace in b/230501926
1 parent 73ddd7b commit 3baa84e

1 file changed

Lines changed: 17 additions & 3 deletions

File tree

  • google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -465,7 +465,7 @@ private void appendLoop() {
465465
// We can close the stream connection and handle the remaining inflight requests.
466466
if (streamConnection != null) {
467467
this.streamConnection.close();
468-
waitForDoneCallback();
468+
waitForDoneCallback(1, TimeUnit.MINUTES);
469469
}
470470

471471
// At this point, there cannot be more callback. It is safe to clean up all inflight requests.
@@ -491,9 +491,10 @@ private boolean waitingQueueDrained() {
491491
}
492492
}
493493

494-
private void waitForDoneCallback() {
494+
private void waitForDoneCallback(long duration, TimeUnit timeUnit) {
495495
log.fine("Waiting for done callback from stream connection. Stream: " + streamName);
496-
while (true) {
496+
long deadline = System.nanoTime() + timeUnit.toNanos(duration);
497+
while (System.nanoTime() <= deadline) {
497498
this.lock.lock();
498499
try {
499500
if (connectionFinalStatus != null) {
@@ -505,6 +506,19 @@ private void waitForDoneCallback() {
505506
}
506507
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
507508
}
509+
this.lock.lock();
510+
try {
511+
if (connectionFinalStatus == null) {
512+
connectionFinalStatus =
513+
new StatusRuntimeException(
514+
Status.fromCode(Code.CANCELLED)
515+
.withDescription("Timeout waiting for DoneCallback."));
516+
}
517+
} finally {
518+
this.lock.unlock();
519+
}
520+
521+
return;
508522
}
509523

510524
private AppendRowsRequest prepareRequestBasedOnPosition(

0 commit comments

Comments
 (0)