@@ -215,30 +215,33 @@ public ApiFuture<AppendRowsResponse> append(AppendRowsRequest message) {
215215 this .inflightBytes += requestWrapper .messageSize ;
216216 waitingRequestQueue .addLast (requestWrapper );
217217 hasMessageInWaitingQueue .signal ();
218-
219- // Maybe block until we are below inflight limit.
220- while (this .inflightRequests >= this .maxInflightRequests
221- || this .inflightBytes >= this .maxInflightBytes ) {
222- try {
223- inflightReduced .await (100 , TimeUnit .MILLISECONDS );
224- } catch (InterruptedException e ) {
225- log .warning (
226- "Interrupted while waiting for inflight quota. Stream: "
227- + streamName
228- + " Error: "
229- + e .toString ());
230- throw new StatusRuntimeException (
231- Status .fromCode (Code .CANCELLED )
232- .withCause (e )
233- .withDescription ("Interrupted while waiting for quota." ));
234- }
235- }
218+ maybeWaitForInflightQuota ();
236219 return requestWrapper .appendResult ;
237220 } finally {
238221 this .lock .unlock ();
239222 }
240223 }
241224
225+ @ GuardedBy ("lock" )
226+ private void maybeWaitForInflightQuota () {
227+ while (this .inflightRequests >= this .maxInflightRequests
228+ || this .inflightBytes >= this .maxInflightBytes ) {
229+ try {
230+ inflightReduced .await (100 , TimeUnit .MILLISECONDS );
231+ } catch (InterruptedException e ) {
232+ log .warning (
233+ "Interrupted while waiting for inflight quota. Stream: "
234+ + streamName
235+ + " Error: "
236+ + e .toString ());
237+ throw new StatusRuntimeException (
238+ Status .fromCode (Code .CANCELLED )
239+ .withCause (e )
240+ .withDescription ("Interrupted while waiting for quota." ));
241+ }
242+ }
243+ }
244+
242245 /** Close the stream writer. Shut down all resources. */
243246 @ Override
244247 public void close () {
0 commit comments