3030import com .google .cloud .storage .Hasher .UncheckedChecksumMismatchException ;
3131import com .google .cloud .storage .ResponseContentLifecycleHandle .ChildRef ;
3232import com .google .cloud .storage .RetryContext .OnSuccess ;
33+ import com .google .common .annotations .VisibleForTesting ;
3334import com .google .common .base .Preconditions ;
3435import com .google .protobuf .ByteString ;
3536import com .google .rpc .Status ;
@@ -61,6 +62,7 @@ final class BlobDescriptorStream
6162 private final ScheduledExecutorService executor ;
6263 private final ZeroCopyBidiStreamingCallable <BidiReadObjectRequest , BidiReadObjectResponse >
6364 callable ;
65+ private final RetryContext streamRetryContext ;
6466 private final int maxRedirectsAllowed ;
6567
6668 private volatile boolean open ;
@@ -74,14 +76,16 @@ private BlobDescriptorStream(
7476 BlobDescriptorState state ,
7577 ScheduledExecutorService executor ,
7678 ZeroCopyBidiStreamingCallable <BidiReadObjectRequest , BidiReadObjectResponse > callable ,
77- int maxRedirectsAllowed ) {
79+ int maxRedirectsAllowed ,
80+ RetryContext backoff ) {
7881 this .state = state ;
7982 this .executor = executor ;
8083 this .callable = callable ;
84+ this .streamRetryContext = backoff ;
8185 this .blobDescriptorResolveFuture = SettableApiFuture .create ();
86+ this .maxRedirectsAllowed = maxRedirectsAllowed ;
8287 this .open = true ;
8388 this .redirectCounter = new AtomicInteger ();
84- this .maxRedirectsAllowed = maxRedirectsAllowed ;
8589 }
8690
8791 // TODO: make this more elegant
@@ -104,20 +108,20 @@ private ClientStream<BidiReadObjectRequest> getRequestStream(@Nullable GrpcCallC
104108 }
105109
106110 @ Override
107- public void close () throws IOException {
111+ public void close () {
108112 if (!open ) {
109113 return ;
110114 }
115+ open = false ;
116+ cleanUp ();
117+ }
111118
112- try {
113- cancel (true );
114- if (requestStream != null ) {
115- requestStream .closeSend ();
116- ApiFutureUtils .await (monitoringResponseObserver .closeSignal );
117- requestStream = null ;
118- }
119- } finally {
120- open = false ;
119+ private void cleanUp () {
120+ cancel (true );
121+ if (requestStream != null ) {
122+ requestStream .closeSend ();
123+ ApiFutureUtils .await (monitoringResponseObserver .closeSignal );
124+ requestStream = null ;
121125 }
122126 }
123127
@@ -183,20 +187,35 @@ public boolean isDone() {
183187 return blobDescriptorResolveFuture .isDone ();
184188 }
185189
190+ boolean isOpen () {
191+ return open ;
192+ }
193+
186194 private void checkOpen () {
187- Preconditions .checkState (open , "not open " );
195+ Preconditions .checkState (open , "Stream closed " );
188196 }
189197
190- private void restart () {
191- reset ();
198+ @ VisibleForTesting
199+ void restart () {
200+ Preconditions .checkState (
201+ requestStream == null , "attempting to restart stream when stream is already active" );
192202
193203 OpenArguments openArguments = state .getOpenArguments ();
194- ClientStream <BidiReadObjectRequest > requestStream1 = getRequestStream (openArguments .getCtx ());
195- requestStream1 .send (openArguments .getReq ());
204+ BidiReadObjectRequest req = openArguments .getReq ();
205+ if (!req .getReadRangesList ().isEmpty () || !blobDescriptorResolveFuture .isDone ()) {
206+ ClientStream <BidiReadObjectRequest > requestStream1 = getRequestStream (openArguments .getCtx ());
207+ requestStream1 .send (req );
208+ }
196209 }
197210
198- private void reset () {
199- requestStream = null ;
211+ private void failAll (Throwable terminalFailure ) {
212+ open = false ;
213+ try {
214+ blobDescriptorResolveFuture .setException (terminalFailure );
215+ state .failAll (executor , terminalFailure );
216+ } finally {
217+ cleanUp ();
218+ }
200219 }
201220
202221 private final class BidiReadObjectResponseObserver
@@ -319,39 +338,55 @@ public void onResponse(BidiReadObjectResponse response) {
319338 }
320339 }
321340 } catch (IOException e ) {
322- // TODO: sync this up with stream restarts when the time comes
323- throw StorageException .coalesce (e );
341+ //
342+ // When using zero-copy, the returned InputStream is of type InputStream rather than its
343+ // concrete subclass. The subclass is `io.grpc.internal.ReadableBuffers.BufferInputStream`
344+ // which exclusively operates on a `io.grpc.internal.ReadableBuffer`. `ReadableBuffer`s
345+ // close method does not throw.
346+ //
347+ // This is defined as an exhaustiveness compliance. {@code javac} dictates we handle an
348+ // `IOException`, even though the underlying classes won't throw it. If the behavior in grpc
349+ // at some point does throw, we catch it here and funnel it into the stream retry handling.
350+ //
351+ requestStream = null ;
352+ streamRetryContext .recordError (
353+ e , BlobDescriptorStream .this ::restart , BlobDescriptorStream .this ::failAll );
324354 }
325355 }
326356
327357 @ Override
328358 public void onError (Throwable t ) {
329- reset ();
330- BidiReadObjectError error = GrpcUtils .getBidiReadObjectError (t );
331- if (error == null ) {
332- return ;
333- }
359+ requestStream = null ;
360+ try {
361+ BidiReadObjectError error = GrpcUtils .getBidiReadObjectError (t );
362+ if (error == null ) {
363+ return ;
364+ }
334365
335- List <ReadRangeError > rangeErrors = error .getReadRangeErrorsList ();
336- if (rangeErrors .isEmpty ()) {
337- return ;
338- }
339- for (ReadRangeError rangeError : rangeErrors ) {
340- Status status = rangeError .getStatus ();
341- long id = rangeError .getReadId ();
342- BlobDescriptorStreamRead read = state .getOutstandingRead (id );
343- if (read == null ) {
344- continue ;
366+ List <ReadRangeError > rangeErrors = error .getReadRangeErrorsList ();
367+ if (rangeErrors .isEmpty ()) {
368+ return ;
345369 }
346- // mark read as failed, but don't resolve its future now. Schedule the delivery of the
347- // failure in executor to ensure any downstream future doesn't block this IO thread.
348- read .preFail ();
349- executor .execute (
350- StorageException .liftToRunnable (
351- () ->
352- state
353- .removeOutstandingReadOnFailure (id , read ::fail )
354- .onFailure (GrpcUtils .statusToApiException (status ))));
370+ for (ReadRangeError rangeError : rangeErrors ) {
371+ Status status = rangeError .getStatus ();
372+ long id = rangeError .getReadId ();
373+ BlobDescriptorStreamRead read = state .getOutstandingRead (id );
374+ if (read == null ) {
375+ continue ;
376+ }
377+ // mark read as failed, but don't resolve its future now. Schedule the delivery of the
378+ // failure in executor to ensure any downstream future doesn't block this IO thread.
379+ read .preFail ();
380+ executor .execute (
381+ StorageException .liftToRunnable (
382+ () ->
383+ state
384+ .removeOutstandingReadOnFailure (id , read ::fail )
385+ .onFailure (GrpcUtils .statusToApiException (status ))));
386+ }
387+ } finally {
388+ streamRetryContext .recordError (
389+ t , BlobDescriptorStream .this ::restart , BlobDescriptorStream .this ::failAll );
355390 }
356391 }
357392
@@ -435,6 +470,7 @@ public void onError(Throwable t) {
435470 delegate .onError (t );
436471 return ;
437472 }
473+ requestStream = null ;
438474 int redirectCount = redirectCounter .incrementAndGet ();
439475 if (redirectCount > maxRedirectsAllowed ) {
440476 // attach the fact we're ignoring the redirect to the original exception as a suppressed
@@ -462,10 +498,11 @@ public void onComplete() {
462498 static BlobDescriptorStream create (
463499 ScheduledExecutorService executor ,
464500 ZeroCopyBidiStreamingCallable <BidiReadObjectRequest , BidiReadObjectResponse > callable ,
465- BlobDescriptorState state ) {
501+ BlobDescriptorState state ,
502+ RetryContext retryContext ) {
466503
467504 int maxRedirectsAllowed = 3 ; // TODO: make this configurable in the ultimate public surface
468- return new BlobDescriptorStream (state , executor , callable , maxRedirectsAllowed );
505+ return new BlobDescriptorStream (state , executor , callable , maxRedirectsAllowed , retryContext );
469506 }
470507
471508 static final class MaxRedirectsExceededException extends RuntimeException {
0 commit comments