Skip to content
This repository was archived by the owner on Feb 24, 2026. It is now read-only.

Commit 74bb949

Browse files
committed
Move inflight quota wait to separate method.
1 parent 318c302 commit 74bb949

File tree

1 file changed

+21
-18
lines changed
  • google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2

1 file changed

+21
-18
lines changed

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -215,30 +215,33 @@ public ApiFuture<AppendRowsResponse> append(AppendRowsRequest message) {
215215
this.inflightBytes += requestWrapper.messageSize;
216216
waitingRequestQueue.addLast(requestWrapper);
217217
hasMessageInWaitingQueue.signal();
218-
219-
// Maybe block until we are below inflight limit.
220-
while (this.inflightRequests >= this.maxInflightRequests
221-
|| this.inflightBytes >= this.maxInflightBytes) {
222-
try {
223-
inflightReduced.await(100, TimeUnit.MILLISECONDS);
224-
} catch (InterruptedException e) {
225-
log.warning(
226-
"Interrupted while waiting for inflight quota. Stream: "
227-
+ streamName
228-
+ " Error: "
229-
+ e.toString());
230-
throw new StatusRuntimeException(
231-
Status.fromCode(Code.CANCELLED)
232-
.withCause(e)
233-
.withDescription("Interrupted while waiting for quota."));
234-
}
235-
}
218+
maybeWaitForInflightQuota();
236219
return requestWrapper.appendResult;
237220
} finally {
238221
this.lock.unlock();
239222
}
240223
}
241224

225+
@GuardedBy("lock")
226+
private void maybeWaitForInflightQuota() {
227+
while (this.inflightRequests >= this.maxInflightRequests
228+
|| this.inflightBytes >= this.maxInflightBytes) {
229+
try {
230+
inflightReduced.await(100, TimeUnit.MILLISECONDS);
231+
} catch (InterruptedException e) {
232+
log.warning(
233+
"Interrupted while waiting for inflight quota. Stream: "
234+
+ streamName
235+
+ " Error: "
236+
+ e.toString());
237+
throw new StatusRuntimeException(
238+
Status.fromCode(Code.CANCELLED)
239+
.withCause(e)
240+
.withDescription("Interrupted while waiting for quota."));
241+
}
242+
}
243+
}
244+
242245
/** Close the stream writer. Shut down all resources. */
243246
@Override
244247
public void close() {

0 commit comments

Comments
 (0)