Skip to content

Commit 0a2fac5

Browse files
committed
chore: update BlobDescriptor implementation to limit consecutive redirects to 3
* fix: update BlobDescriptor implementation to limit consecutive redirects to 3 If a redirect error is received 4 times in a row, the fourth redirect will raise an error. Counting is reset when a BidiReadObjectResponse is received. * chore: de-nestify * fix: update BlobDescriptorStream to handle BidiReadObjectError If the BlobDescriptorStream receives a BidiReadObjectError, each read_id contained within will be signaled with the corresponding Status. In the case of a pending future, the future will be resolved with the ApiException corresponding to the Status. The BlobDescriptorStream will stay "open", and allow future requests be submitted. In a followup PR I will be implementing finegrained read_id status tracking, allowing for tombstoning an individual read that has not yet been removed for the tracked state. (Removal of tracked state must happen after we have signaled failure to the caller, but the signaling of failure happens on a separate thread to avoid any future callbacks from running on the grpc thread.) * chore: attach suppressed exception in the event max redirects is exceeded
1 parent 35c2fbf commit 0a2fac5

File tree

5 files changed

+473
-99
lines changed

5 files changed

+473
-99
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobDescriptorStream.java

Lines changed: 136 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,25 @@
2121
import com.google.api.gax.grpc.GrpcCallContext;
2222
import com.google.api.gax.rpc.BidiStreamingCallable;
2323
import com.google.api.gax.rpc.ClientStream;
24-
import com.google.api.gax.rpc.StateCheckingResponseObserver;
24+
import com.google.api.gax.rpc.ResponseObserver;
2525
import com.google.api.gax.rpc.StreamController;
2626
import com.google.common.base.Preconditions;
27+
import com.google.rpc.Status;
2728
import com.google.storage.v2.BidiReadHandle;
29+
import com.google.storage.v2.BidiReadObjectError;
2830
import com.google.storage.v2.BidiReadObjectRedirectedError;
2931
import com.google.storage.v2.BidiReadObjectRequest;
3032
import com.google.storage.v2.BidiReadObjectResponse;
3133
import com.google.storage.v2.Object;
3234
import com.google.storage.v2.ObjectRangeData;
35+
import com.google.storage.v2.ReadRangeError;
3336
import java.io.IOException;
3437
import java.util.List;
3538
import java.util.concurrent.ExecutionException;
3639
import java.util.concurrent.Executor;
3740
import java.util.concurrent.TimeUnit;
3841
import java.util.concurrent.TimeoutException;
42+
import java.util.concurrent.atomic.AtomicInteger;
3943

4044
final class BlobDescriptorStream
4145
implements ClientStream<BidiReadObjectRequest>, ApiFuture<Void>, AutoCloseable {
@@ -48,25 +52,31 @@ final class BlobDescriptorStream
4852
private final Executor executor;
4953
private final BidiStreamingCallable<BidiReadObjectRequest, BidiReadObjectResponse> callable;
5054
private final GrpcCallContext context;
55+
private final int maxRedirectsAllowed;
5156

5257
private volatile boolean open;
5358
private volatile MonitoringResponseObserver monitoringResponseObserver;
59+
private volatile ResponseObserver<BidiReadObjectResponse> responseObserver;
5460
private volatile ClientStream<BidiReadObjectRequest> requestStream;
5561
private volatile StreamController controller;
62+
private final AtomicInteger redirectCounter;
5663

5764
private BlobDescriptorStream(
5865
BlobDescriptorState state,
5966
Executor executor,
6067
ResponseContentLifecycleManager<BidiReadObjectResponse> bidiResponseContentLifecycleManager,
6168
BidiStreamingCallable<BidiReadObjectRequest, BidiReadObjectResponse> callable,
62-
GrpcCallContext context) {
69+
GrpcCallContext context,
70+
int maxRedirectsAllowed) {
6371
this.state = state;
6472
this.executor = executor;
6573
this.bidiResponseContentLifecycleManager = bidiResponseContentLifecycleManager;
6674
this.callable = callable;
6775
this.context = context;
6876
this.blobDescriptorResolveFuture = SettableApiFuture.create();
6977
this.open = true;
78+
this.redirectCounter = new AtomicInteger();
79+
this.maxRedirectsAllowed = maxRedirectsAllowed;
7080
}
7181

7282
public ClientStream<BidiReadObjectRequest> getRequestStream() {
@@ -77,7 +87,10 @@ public ClientStream<BidiReadObjectRequest> getRequestStream() {
7787
if (requestStream == null) {
7888
monitoringResponseObserver =
7989
new MonitoringResponseObserver(new BidiReadObjectResponseObserver());
80-
requestStream = callable.splitCall(monitoringResponseObserver, context);
90+
responseObserver =
91+
GrpcUtils.decorateAsStateChecking(
92+
new RedirectHandlingResponseObserver(monitoringResponseObserver));
93+
requestStream = callable.splitCall(responseObserver, context);
8194
}
8295
return requestStream;
8396
}
@@ -105,7 +118,11 @@ public void close() throws IOException {
105118
@Override
106119
public void send(BidiReadObjectRequest request) {
107120
checkOpen();
108-
getRequestStream().send(request);
121+
if (requestStream == null) {
122+
restart();
123+
} else {
124+
getRequestStream().send(request);
125+
}
109126
}
110127

111128
@Override
@@ -162,7 +179,7 @@ private void checkOpen() {
162179
}
163180

164181
private void restart() {
165-
requestStream = null;
182+
reset();
166183

167184
BidiReadObjectRequest openRequest = state.getOpenRequest();
168185
BidiReadObjectRequest.Builder b = openRequest.toBuilder().clearReadRanges();
@@ -186,28 +203,28 @@ private void restart() {
186203
}
187204

188205
BidiReadObjectRequest restartRequest = b.build();
189-
synchronized (this) {
190-
ClientStream<BidiReadObjectRequest> requestStream1 = getRequestStream();
191-
requestStream1.send(restartRequest);
192-
// todo: put this in a retry loop
193-
ApiFutureUtils.await(monitoringResponseObserver.openSignal);
194-
}
206+
ClientStream<BidiReadObjectRequest> requestStream1 = getRequestStream();
207+
requestStream1.send(restartRequest);
208+
}
209+
210+
private void reset() {
211+
requestStream = null;
195212
}
196213

197214
private final class BidiReadObjectResponseObserver
198-
extends StateCheckingResponseObserver<BidiReadObjectResponse> {
215+
implements ResponseObserver<BidiReadObjectResponse> {
199216

200217
private BidiReadObjectResponseObserver() {}
201218

202219
@Override
203-
public void onStartImpl(StreamController controller) {
220+
public void onStart(StreamController controller) {
204221
BlobDescriptorStream.this.controller = controller;
205222
controller.disableAutoInboundFlowControl();
206223
controller.request(2);
207224
}
208225

209226
@Override
210-
protected void onResponseImpl(BidiReadObjectResponse response) {
227+
public void onResponse(BidiReadObjectResponse response) {
211228
controller.request(1);
212229
try (ResponseContentLifecycleHandle<BidiReadObjectResponse> handle =
213230
bidiResponseContentLifecycleManager.get(response)) {
@@ -237,16 +254,12 @@ protected void onResponseImpl(BidiReadObjectResponse response) {
237254
// invoke eof on exec, the resolving future could have a downstream callback
238255
// that we don't want to block this grpc thread
239256
executor.execute(
240-
() -> {
241-
try {
242-
read.eof();
243-
// don't remove the outstanding read until the future has been resolved
244-
state.removeOutstandingRead(id);
245-
} catch (IOException e) {
246-
// TODO: sync this up with stream restarts when the time comes
247-
throw StorageException.coalesce(e);
248-
}
249-
});
257+
StorageException.liftToRunnable(
258+
() -> {
259+
read.eof();
260+
// don't remove the outstanding read until the future has been resolved
261+
state.removeOutstandingRead(id);
262+
}));
250263
}
251264
}
252265
} catch (IOException e) {
@@ -256,27 +269,39 @@ protected void onResponseImpl(BidiReadObjectResponse response) {
256269
}
257270

258271
@Override
259-
protected void onErrorImpl(Throwable t) {
260-
BidiReadObjectRedirectedError bidiReadObjectRedirectedError =
261-
GrpcUtils.getBidiReadObjectRedirectedError(t);
262-
if (bidiReadObjectRedirectedError != null) {
263-
if (bidiReadObjectRedirectedError.hasReadHandle()) {
264-
state.setBidiReadHandle(bidiReadObjectRedirectedError.getReadHandle());
265-
}
266-
if (bidiReadObjectRedirectedError.hasRoutingToken()) {
267-
state.setRoutingToken(bidiReadObjectRedirectedError.getRoutingToken());
272+
public void onError(Throwable t) {
273+
BidiReadObjectError error = GrpcUtils.getBidiReadObjectError(t);
274+
if (error == null) {
275+
return;
276+
}
277+
278+
List<ReadRangeError> rangeErrors = error.getReadRangeErrorsList();
279+
if (rangeErrors.isEmpty()) {
280+
return;
281+
}
282+
for (ReadRangeError rangeError : rangeErrors) {
283+
Status status = rangeError.getStatus();
284+
long id = rangeError.getReadId();
285+
BlobDescriptorStreamRead read = state.getOutstandingRead(id);
286+
if (read == null) {
287+
continue;
268288
}
269-
executor.execute(BlobDescriptorStream.this::restart);
289+
executor.execute(
290+
StorageException.liftToRunnable(
291+
() -> {
292+
read.fail(status);
293+
state.removeOutstandingRead(id);
294+
}));
270295
}
296+
reset();
271297
}
272298

273299
@Override
274-
protected void onCompleteImpl() {}
300+
public void onComplete() {}
275301
}
276302

277-
private class MonitoringResponseObserver
278-
extends StateCheckingResponseObserver<BidiReadObjectResponse> {
279-
private final BidiReadObjectResponseObserver delegate;
303+
private class MonitoringResponseObserver implements ResponseObserver<BidiReadObjectResponse> {
304+
private final BlobDescriptorStream.BidiReadObjectResponseObserver delegate;
280305
private final SettableApiFuture<Void> openSignal;
281306
private final SettableApiFuture<Void> closeSignal;
282307

@@ -287,46 +312,106 @@ private MonitoringResponseObserver(BidiReadObjectResponseObserver delegate) {
287312
}
288313

289314
@Override
290-
protected void onStartImpl(StreamController controller) {
315+
public void onStart(StreamController controller) {
291316
delegate.onStart(controller);
292317
}
293318

294319
@Override
295-
protected void onResponseImpl(BidiReadObjectResponse response) {
320+
public void onResponse(BidiReadObjectResponse response) {
296321
delegate.onResponse(response);
297322
openSignal.set(null);
298323
blobDescriptorResolveFuture.set(null);
299324
}
300325

301326
@Override
302-
protected void onErrorImpl(Throwable t) {
303-
if (GrpcUtils.isBidiReadObjectRedirect(t)) {
304-
delegate.onError(t);
305-
} else {
306-
delegate.onError(t);
307-
blobDescriptorResolveFuture.setException(t);
308-
openSignal.setException(t);
309-
closeSignal.setException(t);
310-
}
327+
public void onError(Throwable t) {
328+
delegate.onError(t);
329+
blobDescriptorResolveFuture.setException(t);
330+
openSignal.setException(t);
331+
closeSignal.setException(t);
311332
}
312333

313334
@Override
314-
protected void onCompleteImpl() {
335+
public void onComplete() {
315336
delegate.onComplete();
316337
blobDescriptorResolveFuture.set(null);
317338
openSignal.set(null);
318339
closeSignal.set(null);
319340
}
320341
}
321342

343+
private final class RedirectHandlingResponseObserver
344+
implements ResponseObserver<BidiReadObjectResponse> {
345+
private final ResponseObserver<BidiReadObjectResponse> delegate;
346+
347+
private RedirectHandlingResponseObserver(ResponseObserver<BidiReadObjectResponse> delegate) {
348+
this.delegate = delegate;
349+
}
350+
351+
@Override
352+
public void onStart(StreamController controller) {
353+
delegate.onStart(controller);
354+
}
355+
356+
@Override
357+
public void onResponse(BidiReadObjectResponse response) {
358+
redirectCounter.set(0);
359+
delegate.onResponse(response);
360+
}
361+
362+
@Override
363+
public void onError(Throwable t) {
364+
BidiReadObjectRedirectedError error = GrpcUtils.getBidiReadObjectRedirectedError(t);
365+
if (error == null) {
366+
delegate.onError(t);
367+
return;
368+
}
369+
int redirectCount = redirectCounter.incrementAndGet();
370+
if (redirectCount > maxRedirectsAllowed) {
371+
// attach the fact we're ignoring the redirect to the original exception as a suppressed
372+
// Exception. The lower level handler can then perform its usual handling, but if things
373+
// bubble all the way up to the invoker we'll be able to see it in a bug report.
374+
t.addSuppressed(new MaxRedirectsExceededException(maxRedirectsAllowed, redirectCount));
375+
delegate.onError(t);
376+
return;
377+
}
378+
if (error.hasReadHandle()) {
379+
state.setBidiReadHandle(error.getReadHandle());
380+
}
381+
if (error.hasRoutingToken()) {
382+
state.setRoutingToken(error.getRoutingToken());
383+
}
384+
executor.execute(BlobDescriptorStream.this::restart);
385+
}
386+
387+
@Override
388+
public void onComplete() {
389+
delegate.onComplete();
390+
}
391+
}
392+
322393
static BlobDescriptorStream create(
323394
Executor executor,
324395
ResponseContentLifecycleManager<BidiReadObjectResponse> bidiResponseContentLifecycleManager,
325396
BidiStreamingCallable<BidiReadObjectRequest, BidiReadObjectResponse> callable,
326397
GrpcCallContext context,
327398
BlobDescriptorState state) {
328399

400+
int maxRedirectsAllowed = 3; // TODO: make this configurable in the ultimate public surface
329401
return new BlobDescriptorStream(
330-
state, executor, bidiResponseContentLifecycleManager, callable, context);
402+
state,
403+
executor,
404+
bidiResponseContentLifecycleManager,
405+
callable,
406+
context,
407+
maxRedirectsAllowed);
408+
}
409+
410+
static final class MaxRedirectsExceededException extends RuntimeException {
411+
private MaxRedirectsExceededException(int maxRedirectAllowed, int actualRedirects) {
412+
super(
413+
String.format(
414+
"max redirects exceeded (max: %d, actual: %d)", maxRedirectAllowed, actualRedirects));
415+
}
331416
}
332417
}

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobDescriptorStreamRead.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,15 @@
1717
package com.google.cloud.storage;
1818

1919
import com.google.api.core.SettableApiFuture;
20+
import com.google.api.gax.grpc.GrpcStatusCode;
21+
import com.google.api.gax.rpc.ApiException;
22+
import com.google.api.gax.rpc.ApiExceptionFactory;
2023
import com.google.cloud.storage.BlobDescriptor.ZeroCopySupport.DisposableByteString;
2124
import com.google.cloud.storage.ResponseContentLifecycleHandle.ChildRef;
2225
import com.google.protobuf.ByteString;
26+
import com.google.rpc.Status;
2327
import com.google.storage.v2.ReadRange;
28+
import io.grpc.StatusRuntimeException;
2429
import java.io.Closeable;
2530
import java.io.IOException;
2631
import java.util.ArrayList;
@@ -45,6 +50,8 @@ private BlobDescriptorStreamRead(long readId, long readOffset, long readLimit) {
4550

4651
abstract void eof() throws IOException;
4752

53+
abstract void fail(Status status) throws IOException;
54+
4855
final ReadRange makeReadRange() {
4956
return ReadRange.newBuilder()
5057
.setReadId(readId)
@@ -81,6 +88,16 @@ private AccumulatingRead(
8188
super(readId, readOffset, readLimit);
8289
this.complete = complete;
8390
}
91+
92+
@Override
93+
void fail(Status status) throws IOException {
94+
io.grpc.Status grpcStatus = io.grpc.Status.fromCodeValue(status.getCode());
95+
StatusRuntimeException cause = grpcStatus.asRuntimeException();
96+
ApiException apiException =
97+
ApiExceptionFactory.createException(
98+
cause, GrpcStatusCode.of(grpcStatus.getCode()), false);
99+
complete.setException(apiException);
100+
}
84101
}
85102

86103
/**

0 commit comments

Comments
 (0)