Skip to content

Commit 8d4d7a3

Browse files
committed
chore: integrate stream wide retry/backoff awareness for read object ranges
1 parent affb14f commit 8d4d7a3

File tree

9 files changed

+460
-64
lines changed

9 files changed

+460
-64
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobDescriptorImpl.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package com.google.cloud.storage;
1818

19+
import static com.google.common.base.Preconditions.checkState;
20+
1921
import com.google.api.core.ApiFuture;
2022
import com.google.api.core.ApiFutures;
2123
import com.google.api.core.SettableApiFuture;
@@ -55,6 +57,7 @@ private BlobDescriptorImpl(
5557

5658
@Override
5759
public ApiFuture<byte[]> readRangeAsBytes(RangeSpec range) {
60+
checkState(stream.isOpen(), "stream already closed");
5861
long readId = state.newReadId();
5962
ReadCursor readCursor = getReadCursor(range, state);
6063
if (!readCursor.hasRemaining()) {
@@ -72,6 +75,7 @@ public ApiFuture<byte[]> readRangeAsBytes(RangeSpec range) {
7275
}
7376

7477
public ApiFuture<DisposableByteString> readRangeAsByteString(RangeSpec range) {
78+
checkState(stream.isOpen(), "stream already closed");
7579
long readId = state.newReadId();
7680
ReadCursor readCursor = getReadCursor(range, state);
7781
if (!readCursor.hasRemaining()) {
@@ -126,7 +130,8 @@ static ApiFuture<BlobDescriptor> create(
126130
RetryContextProvider retryContextProvider) {
127131
BlobDescriptorState state = new BlobDescriptorState(context, openRequest);
128132

129-
BlobDescriptorStream stream = BlobDescriptorStream.create(executor, callable, state);
133+
BlobDescriptorStream stream =
134+
BlobDescriptorStream.create(executor, callable, state, retryContextProvider.create());
130135

131136
ApiFuture<BlobDescriptor> blobDescriptorFuture =
132137
ApiFutures.transform(

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobDescriptorState.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,12 @@
2727
import com.google.storage.v2.BidiReadObjectSpec;
2828
import com.google.storage.v2.Object;
2929
import java.util.HashMap;
30+
import java.util.Iterator;
3031
import java.util.List;
3132
import java.util.Map;
33+
import java.util.Map.Entry;
3234
import java.util.Objects;
35+
import java.util.concurrent.Executor;
3336
import java.util.concurrent.atomic.AtomicLong;
3437
import java.util.concurrent.atomic.AtomicReference;
3538
import java.util.concurrent.locks.ReentrantLock;
@@ -176,6 +179,21 @@ BlobDescriptorStreamRead assignNewReadId(long oldReadId) {
176179
}
177180
}
178181

182+
public void failAll(Executor executor, Throwable terminalFailure) {
183+
lock.lock();
184+
try {
185+
Iterator<Entry<Long, BlobDescriptorStreamRead>> iter = outstandingReads.entrySet().iterator();
186+
while (iter.hasNext()) {
187+
Entry<Long, BlobDescriptorStreamRead> entry = iter.next();
188+
iter.remove();
189+
BlobDescriptorStreamRead read = entry.getValue();
190+
executor.execute(() -> read.fail(StorageException.coalesce(terminalFailure)));
191+
}
192+
} finally {
193+
lock.unlock();
194+
}
195+
}
196+
179197
static final class OpenArguments {
180198
private final GrpcCallContext ctx;
181199
private final BidiReadObjectRequest req;

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobDescriptorStream.java

Lines changed: 84 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import com.google.cloud.storage.Hasher.UncheckedChecksumMismatchException;
3131
import com.google.cloud.storage.ResponseContentLifecycleHandle.ChildRef;
3232
import com.google.cloud.storage.RetryContext.OnSuccess;
33+
import com.google.common.annotations.VisibleForTesting;
3334
import com.google.common.base.Preconditions;
3435
import com.google.protobuf.ByteString;
3536
import com.google.rpc.Status;
@@ -61,6 +62,7 @@ final class BlobDescriptorStream
6162
private final ScheduledExecutorService executor;
6263
private final ZeroCopyBidiStreamingCallable<BidiReadObjectRequest, BidiReadObjectResponse>
6364
callable;
65+
private final RetryContext streamRetryContext;
6466
private final int maxRedirectsAllowed;
6567

6668
private volatile boolean open;
@@ -74,14 +76,16 @@ private BlobDescriptorStream(
7476
BlobDescriptorState state,
7577
ScheduledExecutorService executor,
7678
ZeroCopyBidiStreamingCallable<BidiReadObjectRequest, BidiReadObjectResponse> callable,
77-
int maxRedirectsAllowed) {
79+
int maxRedirectsAllowed,
80+
RetryContext backoff) {
7881
this.state = state;
7982
this.executor = executor;
8083
this.callable = callable;
84+
this.streamRetryContext = backoff;
8185
this.blobDescriptorResolveFuture = SettableApiFuture.create();
86+
this.maxRedirectsAllowed = maxRedirectsAllowed;
8287
this.open = true;
8388
this.redirectCounter = new AtomicInteger();
84-
this.maxRedirectsAllowed = maxRedirectsAllowed;
8589
}
8690

8791
// TODO: make this more elegant
@@ -104,20 +108,20 @@ private ClientStream<BidiReadObjectRequest> getRequestStream(@Nullable GrpcCallC
104108
}
105109

106110
@Override
107-
public void close() throws IOException {
111+
public void close() {
108112
if (!open) {
109113
return;
110114
}
115+
open = false;
116+
cleanUp();
117+
}
111118

112-
try {
113-
cancel(true);
114-
if (requestStream != null) {
115-
requestStream.closeSend();
116-
ApiFutureUtils.await(monitoringResponseObserver.closeSignal);
117-
requestStream = null;
118-
}
119-
} finally {
120-
open = false;
119+
private void cleanUp() {
120+
cancel(true);
121+
if (requestStream != null) {
122+
requestStream.closeSend();
123+
ApiFutureUtils.await(monitoringResponseObserver.closeSignal);
124+
requestStream = null;
121125
}
122126
}
123127

@@ -183,20 +187,35 @@ public boolean isDone() {
183187
return blobDescriptorResolveFuture.isDone();
184188
}
185189

190+
boolean isOpen() {
191+
return open;
192+
}
193+
186194
private void checkOpen() {
187-
Preconditions.checkState(open, "not open");
195+
Preconditions.checkState(open, "Stream closed");
188196
}
189197

190-
private void restart() {
191-
reset();
198+
@VisibleForTesting
199+
void restart() {
200+
Preconditions.checkState(
201+
requestStream == null, "attempting to restart stream when stream is already active");
192202

193203
OpenArguments openArguments = state.getOpenArguments();
194-
ClientStream<BidiReadObjectRequest> requestStream1 = getRequestStream(openArguments.getCtx());
195-
requestStream1.send(openArguments.getReq());
204+
BidiReadObjectRequest req = openArguments.getReq();
205+
if (!req.getReadRangesList().isEmpty() || !blobDescriptorResolveFuture.isDone()) {
206+
ClientStream<BidiReadObjectRequest> requestStream1 = getRequestStream(openArguments.getCtx());
207+
requestStream1.send(req);
208+
}
196209
}
197210

198-
private void reset() {
199-
requestStream = null;
211+
private void failAll(Throwable terminalFailure) {
212+
open = false;
213+
try {
214+
blobDescriptorResolveFuture.setException(terminalFailure);
215+
state.failAll(executor, terminalFailure);
216+
} finally {
217+
cleanUp();
218+
}
200219
}
201220

202221
private final class BidiReadObjectResponseObserver
@@ -319,39 +338,55 @@ public void onResponse(BidiReadObjectResponse response) {
319338
}
320339
}
321340
} catch (IOException e) {
322-
// TODO: sync this up with stream restarts when the time comes
323-
throw StorageException.coalesce(e);
341+
//
342+
// When using zero-copy, the returned InputStream is of type InputStream rather than its
343+
// concrete subclass. The subclass is `io.grpc.internal.ReadableBuffers.BufferInputStream`
344+
// which exclusively operates on a `io.grpc.internal.ReadableBuffer`. `ReadableBuffer`s
345+
// close method does not throw.
346+
//
347+
// This is defined as an exhaustiveness compliance. {@code javac} dictates we handle an
348+
// `IOException`, even though the underlying classes won't throw it. If the behavior in grpc
349+
// at some point does throw, we catch it here and funnel it into the stream retry handling.
350+
//
351+
requestStream = null;
352+
streamRetryContext.recordError(
353+
e, BlobDescriptorStream.this::restart, BlobDescriptorStream.this::failAll);
324354
}
325355
}
326356

327357
@Override
328358
public void onError(Throwable t) {
329-
reset();
330-
BidiReadObjectError error = GrpcUtils.getBidiReadObjectError(t);
331-
if (error == null) {
332-
return;
333-
}
359+
requestStream = null;
360+
try {
361+
BidiReadObjectError error = GrpcUtils.getBidiReadObjectError(t);
362+
if (error == null) {
363+
return;
364+
}
334365

335-
List<ReadRangeError> rangeErrors = error.getReadRangeErrorsList();
336-
if (rangeErrors.isEmpty()) {
337-
return;
338-
}
339-
for (ReadRangeError rangeError : rangeErrors) {
340-
Status status = rangeError.getStatus();
341-
long id = rangeError.getReadId();
342-
BlobDescriptorStreamRead read = state.getOutstandingRead(id);
343-
if (read == null) {
344-
continue;
366+
List<ReadRangeError> rangeErrors = error.getReadRangeErrorsList();
367+
if (rangeErrors.isEmpty()) {
368+
return;
345369
}
346-
// mark read as failed, but don't resolve its future now. Schedule the delivery of the
347-
// failure in executor to ensure any downstream future doesn't block this IO thread.
348-
read.preFail();
349-
executor.execute(
350-
StorageException.liftToRunnable(
351-
() ->
352-
state
353-
.removeOutstandingReadOnFailure(id, read::fail)
354-
.onFailure(GrpcUtils.statusToApiException(status))));
370+
for (ReadRangeError rangeError : rangeErrors) {
371+
Status status = rangeError.getStatus();
372+
long id = rangeError.getReadId();
373+
BlobDescriptorStreamRead read = state.getOutstandingRead(id);
374+
if (read == null) {
375+
continue;
376+
}
377+
// mark read as failed, but don't resolve its future now. Schedule the delivery of the
378+
// failure in executor to ensure any downstream future doesn't block this IO thread.
379+
read.preFail();
380+
executor.execute(
381+
StorageException.liftToRunnable(
382+
() ->
383+
state
384+
.removeOutstandingReadOnFailure(id, read::fail)
385+
.onFailure(GrpcUtils.statusToApiException(status))));
386+
}
387+
} finally {
388+
streamRetryContext.recordError(
389+
t, BlobDescriptorStream.this::restart, BlobDescriptorStream.this::failAll);
355390
}
356391
}
357392

@@ -435,6 +470,7 @@ public void onError(Throwable t) {
435470
delegate.onError(t);
436471
return;
437472
}
473+
requestStream = null;
438474
int redirectCount = redirectCounter.incrementAndGet();
439475
if (redirectCount > maxRedirectsAllowed) {
440476
// attach the fact we're ignoring the redirect to the original exception as a suppressed
@@ -462,10 +498,11 @@ public void onComplete() {
462498
static BlobDescriptorStream create(
463499
ScheduledExecutorService executor,
464500
ZeroCopyBidiStreamingCallable<BidiReadObjectRequest, BidiReadObjectResponse> callable,
465-
BlobDescriptorState state) {
501+
BlobDescriptorState state,
502+
RetryContext retryContext) {
466503

467504
int maxRedirectsAllowed = 3; // TODO: make this configurable in the ultimate public surface
468-
return new BlobDescriptorStream(state, executor, callable, maxRedirectsAllowed);
505+
return new BlobDescriptorStream(state, executor, callable, maxRedirectsAllowed, retryContext);
469506
}
470507

471508
static final class MaxRedirectsExceededException extends RuntimeException {

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobDescriptorStreamRead.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,11 @@ abstract class BlobDescriptorStreamRead implements AutoCloseable, Closeable {
3838
protected boolean closed;
3939
protected boolean tombstoned;
4040

41-
private BlobDescriptorStreamRead(long readId, ReadCursor readCursor, RetryContext retryContext) {
41+
BlobDescriptorStreamRead(long readId, ReadCursor readCursor, RetryContext retryContext) {
4242
this(readId, readCursor, Collections.synchronizedList(new ArrayList<>()), retryContext, false);
4343
}
4444

45-
private BlobDescriptorStreamRead(
45+
BlobDescriptorStreamRead(
4646
long readId,
4747
ReadCursor readCursor,
4848
List<ChildRef> childRefs,

google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -227,15 +227,7 @@ final class GrpcStorageImpl extends BaseService<StorageOptions>
227227
RetryContext.providerFrom(
228228
executor,
229229
getOptions(),
230-
new BasicResultRetryAlgorithm<Object>() {
231-
@Override
232-
public boolean shouldRetry(Throwable previousThrowable, Object previousResponse) {
233-
// this is only retryable with read object range, not other requests
234-
return previousThrowable instanceof UncheckedChecksumMismatchException
235-
|| previousThrowable instanceof OutOfRangeException
236-
|| retryAlgorithmManager.idempotent().shouldRetry(previousThrowable, null);
237-
}
238-
});
230+
new ReadObjectRangeResultRetryAlgorithmDecorator(retryAlgorithmManager.idempotent()));
239231
}
240232

241233
@Override
@@ -2092,4 +2084,22 @@ private ZeroCopyServerStreamingCallable<ReadObjectRequest, ReadObjectResponse> r
20922084
storageClient.readObjectCallable().withDefaultCallContext(grpcCallContext),
20932085
responseContentLifecycleManager);
20942086
}
2087+
2088+
private static class ReadObjectRangeResultRetryAlgorithmDecorator
2089+
extends BasicResultRetryAlgorithm<Object> {
2090+
2091+
private final ResultRetryAlgorithm<?> delegate;
2092+
2093+
private ReadObjectRangeResultRetryAlgorithmDecorator(ResultRetryAlgorithm<?> delegate) {
2094+
this.delegate = delegate;
2095+
}
2096+
2097+
@Override
2098+
public boolean shouldRetry(Throwable previousThrowable, Object previousResponse) {
2099+
// this is only retryable with read object range, not other requests
2100+
return previousThrowable instanceof UncheckedChecksumMismatchException
2101+
|| previousThrowable instanceof OutOfRangeException
2102+
|| delegate.shouldRetry(StorageException.coalesce(previousThrowable), null);
2103+
}
2104+
}
20952105
}

0 commit comments

Comments
 (0)