2121import com .google .api .gax .grpc .GrpcCallContext ;
2222import com .google .api .gax .rpc .BidiStreamingCallable ;
2323import com .google .api .gax .rpc .ClientStream ;
24- import com .google .api .gax .rpc .StateCheckingResponseObserver ;
24+ import com .google .api .gax .rpc .ResponseObserver ;
2525import com .google .api .gax .rpc .StreamController ;
2626import com .google .common .base .Preconditions ;
27+ import com .google .rpc .Status ;
2728import com .google .storage .v2 .BidiReadHandle ;
29+ import com .google .storage .v2 .BidiReadObjectError ;
2830import com .google .storage .v2 .BidiReadObjectRedirectedError ;
2931import com .google .storage .v2 .BidiReadObjectRequest ;
3032import com .google .storage .v2 .BidiReadObjectResponse ;
3133import com .google .storage .v2 .Object ;
3234import com .google .storage .v2 .ObjectRangeData ;
35+ import com .google .storage .v2 .ReadRangeError ;
3336import java .io .IOException ;
3437import java .util .List ;
3538import java .util .concurrent .ExecutionException ;
3639import java .util .concurrent .Executor ;
3740import java .util .concurrent .TimeUnit ;
3841import java .util .concurrent .TimeoutException ;
42+ import java .util .concurrent .atomic .AtomicInteger ;
3943
4044final class BlobDescriptorStream
4145 implements ClientStream <BidiReadObjectRequest >, ApiFuture <Void >, AutoCloseable {
@@ -48,25 +52,31 @@ final class BlobDescriptorStream
4852 private final Executor executor ;
4953 private final BidiStreamingCallable <BidiReadObjectRequest , BidiReadObjectResponse > callable ;
5054 private final GrpcCallContext context ;
55+ private final int maxRedirectsAllowed ;
5156
5257 private volatile boolean open ;
5358 private volatile MonitoringResponseObserver monitoringResponseObserver ;
59+ private volatile ResponseObserver <BidiReadObjectResponse > responseObserver ;
5460 private volatile ClientStream <BidiReadObjectRequest > requestStream ;
5561 private volatile StreamController controller ;
62+ private final AtomicInteger redirectCounter ;
5663
5764 private BlobDescriptorStream (
5865 BlobDescriptorState state ,
5966 Executor executor ,
6067 ResponseContentLifecycleManager <BidiReadObjectResponse > bidiResponseContentLifecycleManager ,
6168 BidiStreamingCallable <BidiReadObjectRequest , BidiReadObjectResponse > callable ,
62- GrpcCallContext context ) {
69+ GrpcCallContext context ,
70+ int maxRedirectsAllowed ) {
6371 this .state = state ;
6472 this .executor = executor ;
6573 this .bidiResponseContentLifecycleManager = bidiResponseContentLifecycleManager ;
6674 this .callable = callable ;
6775 this .context = context ;
6876 this .blobDescriptorResolveFuture = SettableApiFuture .create ();
6977 this .open = true ;
78+ this .redirectCounter = new AtomicInteger ();
79+ this .maxRedirectsAllowed = maxRedirectsAllowed ;
7080 }
7181
7282 public ClientStream <BidiReadObjectRequest > getRequestStream () {
@@ -77,7 +87,10 @@ public ClientStream<BidiReadObjectRequest> getRequestStream() {
7787 if (requestStream == null ) {
7888 monitoringResponseObserver =
7989 new MonitoringResponseObserver (new BidiReadObjectResponseObserver ());
80- requestStream = callable .splitCall (monitoringResponseObserver , context );
90+ responseObserver =
91+ GrpcUtils .decorateAsStateChecking (
92+ new RedirectHandlingResponseObserver (monitoringResponseObserver ));
93+ requestStream = callable .splitCall (responseObserver , context );
8194 }
8295 return requestStream ;
8396 }
@@ -105,7 +118,11 @@ public void close() throws IOException {
105118 @ Override
106119 public void send (BidiReadObjectRequest request ) {
107120 checkOpen ();
108- getRequestStream ().send (request );
121+ if (requestStream == null ) {
122+ restart ();
123+ } else {
124+ getRequestStream ().send (request );
125+ }
109126 }
110127
111128 @ Override
@@ -162,7 +179,7 @@ private void checkOpen() {
162179 }
163180
164181 private void restart () {
165- requestStream = null ;
182+ reset () ;
166183
167184 BidiReadObjectRequest openRequest = state .getOpenRequest ();
168185 BidiReadObjectRequest .Builder b = openRequest .toBuilder ().clearReadRanges ();
@@ -186,28 +203,28 @@ private void restart() {
186203 }
187204
188205 BidiReadObjectRequest restartRequest = b .build ();
189- synchronized ( this ) {
190- ClientStream < BidiReadObjectRequest > requestStream1 = getRequestStream ( );
191- requestStream1 . send ( restartRequest );
192- // todo: put this in a retry loop
193- ApiFutureUtils . await ( monitoringResponseObserver . openSignal );
194- }
206+ ClientStream < BidiReadObjectRequest > requestStream1 = getRequestStream ();
207+ requestStream1 . send ( restartRequest );
208+ }
209+
210+ private void reset () {
211+ requestStream = null ;
195212 }
196213
197214 private final class BidiReadObjectResponseObserver
198- extends StateCheckingResponseObserver <BidiReadObjectResponse > {
215+ implements ResponseObserver <BidiReadObjectResponse > {
199216
200217 private BidiReadObjectResponseObserver () {}
201218
202219 @ Override
203- public void onStartImpl (StreamController controller ) {
220+ public void onStart (StreamController controller ) {
204221 BlobDescriptorStream .this .controller = controller ;
205222 controller .disableAutoInboundFlowControl ();
206223 controller .request (2 );
207224 }
208225
209226 @ Override
210- protected void onResponseImpl (BidiReadObjectResponse response ) {
227+ public void onResponse (BidiReadObjectResponse response ) {
211228 controller .request (1 );
212229 try (ResponseContentLifecycleHandle <BidiReadObjectResponse > handle =
213230 bidiResponseContentLifecycleManager .get (response )) {
@@ -237,16 +254,12 @@ protected void onResponseImpl(BidiReadObjectResponse response) {
237254 // invoke eof on exec, the resolving future could have a downstream callback
238255 // that we don't want to block this grpc thread
239256 executor .execute (
240- () -> {
241- try {
242- read .eof ();
243- // don't remove the outstanding read until the future has been resolved
244- state .removeOutstandingRead (id );
245- } catch (IOException e ) {
246- // TODO: sync this up with stream restarts when the time comes
247- throw StorageException .coalesce (e );
248- }
249- });
257+ StorageException .liftToRunnable (
258+ () -> {
259+ read .eof ();
260+ // don't remove the outstanding read until the future has been resolved
261+ state .removeOutstandingRead (id );
262+ }));
250263 }
251264 }
252265 } catch (IOException e ) {
@@ -256,27 +269,39 @@ protected void onResponseImpl(BidiReadObjectResponse response) {
256269 }
257270
258271 @ Override
259- protected void onErrorImpl (Throwable t ) {
260- BidiReadObjectRedirectedError bidiReadObjectRedirectedError =
261- GrpcUtils .getBidiReadObjectRedirectedError (t );
262- if (bidiReadObjectRedirectedError != null ) {
263- if (bidiReadObjectRedirectedError .hasReadHandle ()) {
264- state .setBidiReadHandle (bidiReadObjectRedirectedError .getReadHandle ());
265- }
266- if (bidiReadObjectRedirectedError .hasRoutingToken ()) {
267- state .setRoutingToken (bidiReadObjectRedirectedError .getRoutingToken ());
272+ public void onError (Throwable t ) {
273+ BidiReadObjectError error = GrpcUtils .getBidiReadObjectError (t );
274+ if (error == null ) {
275+ return ;
276+ }
277+
278+ List <ReadRangeError > rangeErrors = error .getReadRangeErrorsList ();
279+ if (rangeErrors .isEmpty ()) {
280+ return ;
281+ }
282+ for (ReadRangeError rangeError : rangeErrors ) {
283+ Status status = rangeError .getStatus ();
284+ long id = rangeError .getReadId ();
285+ BlobDescriptorStreamRead read = state .getOutstandingRead (id );
286+ if (read == null ) {
287+ continue ;
268288 }
269- executor .execute (BlobDescriptorStream .this ::restart );
289+ executor .execute (
290+ StorageException .liftToRunnable (
291+ () -> {
292+ read .fail (status );
293+ state .removeOutstandingRead (id );
294+ }));
270295 }
296+ reset ();
271297 }
272298
273299 @ Override
274- protected void onCompleteImpl () {}
300+ public void onComplete () {}
275301 }
276302
277- private class MonitoringResponseObserver
278- extends StateCheckingResponseObserver <BidiReadObjectResponse > {
279- private final BidiReadObjectResponseObserver delegate ;
303+ private class MonitoringResponseObserver implements ResponseObserver <BidiReadObjectResponse > {
304+ private final BlobDescriptorStream .BidiReadObjectResponseObserver delegate ;
280305 private final SettableApiFuture <Void > openSignal ;
281306 private final SettableApiFuture <Void > closeSignal ;
282307
@@ -287,46 +312,106 @@ private MonitoringResponseObserver(BidiReadObjectResponseObserver delegate) {
287312 }
288313
289314 @ Override
290- protected void onStartImpl (StreamController controller ) {
315+ public void onStart (StreamController controller ) {
291316 delegate .onStart (controller );
292317 }
293318
294319 @ Override
295- protected void onResponseImpl (BidiReadObjectResponse response ) {
320+ public void onResponse (BidiReadObjectResponse response ) {
296321 delegate .onResponse (response );
297322 openSignal .set (null );
298323 blobDescriptorResolveFuture .set (null );
299324 }
300325
301326 @ Override
302- protected void onErrorImpl (Throwable t ) {
303- if (GrpcUtils .isBidiReadObjectRedirect (t )) {
304- delegate .onError (t );
305- } else {
306- delegate .onError (t );
307- blobDescriptorResolveFuture .setException (t );
308- openSignal .setException (t );
309- closeSignal .setException (t );
310- }
327+ public void onError (Throwable t ) {
328+ delegate .onError (t );
329+ blobDescriptorResolveFuture .setException (t );
330+ openSignal .setException (t );
331+ closeSignal .setException (t );
311332 }
312333
313334 @ Override
314- protected void onCompleteImpl () {
335+ public void onComplete () {
315336 delegate .onComplete ();
316337 blobDescriptorResolveFuture .set (null );
317338 openSignal .set (null );
318339 closeSignal .set (null );
319340 }
320341 }
321342
343+ private final class RedirectHandlingResponseObserver
344+ implements ResponseObserver <BidiReadObjectResponse > {
345+ private final ResponseObserver <BidiReadObjectResponse > delegate ;
346+
347+ private RedirectHandlingResponseObserver (ResponseObserver <BidiReadObjectResponse > delegate ) {
348+ this .delegate = delegate ;
349+ }
350+
351+ @ Override
352+ public void onStart (StreamController controller ) {
353+ delegate .onStart (controller );
354+ }
355+
356+ @ Override
357+ public void onResponse (BidiReadObjectResponse response ) {
358+ redirectCounter .set (0 );
359+ delegate .onResponse (response );
360+ }
361+
362+ @ Override
363+ public void onError (Throwable t ) {
364+ BidiReadObjectRedirectedError error = GrpcUtils .getBidiReadObjectRedirectedError (t );
365+ if (error == null ) {
366+ delegate .onError (t );
367+ return ;
368+ }
369+ int redirectCount = redirectCounter .incrementAndGet ();
370+ if (redirectCount > maxRedirectsAllowed ) {
371+ // attach the fact we're ignoring the redirect to the original exception as a suppressed
372+ // Exception. The lower level handler can then perform its usual handling, but if things
373+ // bubble all the way up to the invoker we'll be able to see it in a bug report.
374+ t .addSuppressed (new MaxRedirectsExceededException (maxRedirectsAllowed , redirectCount ));
375+ delegate .onError (t );
376+ return ;
377+ }
378+ if (error .hasReadHandle ()) {
379+ state .setBidiReadHandle (error .getReadHandle ());
380+ }
381+ if (error .hasRoutingToken ()) {
382+ state .setRoutingToken (error .getRoutingToken ());
383+ }
384+ executor .execute (BlobDescriptorStream .this ::restart );
385+ }
386+
387+ @ Override
388+ public void onComplete () {
389+ delegate .onComplete ();
390+ }
391+ }
392+
322393 static BlobDescriptorStream create (
323394 Executor executor ,
324395 ResponseContentLifecycleManager <BidiReadObjectResponse > bidiResponseContentLifecycleManager ,
325396 BidiStreamingCallable <BidiReadObjectRequest , BidiReadObjectResponse > callable ,
326397 GrpcCallContext context ,
327398 BlobDescriptorState state ) {
328399
400+ int maxRedirectsAllowed = 3 ; // TODO: make this configurable in the ultimate public surface
329401 return new BlobDescriptorStream (
330- state , executor , bidiResponseContentLifecycleManager , callable , context );
402+ state ,
403+ executor ,
404+ bidiResponseContentLifecycleManager ,
405+ callable ,
406+ context ,
407+ maxRedirectsAllowed );
408+ }
409+
410+ static final class MaxRedirectsExceededException extends RuntimeException {
411+ private MaxRedirectsExceededException (int maxRedirectAllowed , int actualRedirects ) {
412+ super (
413+ String .format (
414+ "max redirects exceeded (max: %d, actual: %d)" , maxRedirectAllowed , actualRedirects ));
415+ }
331416 }
332417}
0 commit comments