Skip to content

Commit f7c37f5

Browse files
committed
chore: add retry tracking to BlobDescriptorStream out of range mismatch
1 parent a7a8945 commit f7c37f5

File tree

4 files changed

+87
-18
lines changed

4 files changed

+87
-18
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobDescriptorStream.java

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import com.google.api.core.ApiFuture;
2020
import com.google.api.core.SettableApiFuture;
2121
import com.google.api.gax.grpc.GrpcCallContext;
22+
import com.google.api.gax.grpc.GrpcStatusCode;
23+
import com.google.api.gax.rpc.ApiExceptionFactory;
2224
import com.google.api.gax.rpc.ClientStream;
2325
import com.google.api.gax.rpc.ResponseObserver;
2426
import com.google.api.gax.rpc.StreamController;
@@ -300,21 +302,22 @@ public void onResponse(BidiReadObjectResponse response) {
300302
requestStream.send(requestWithNewReadId);
301303
}
302304
} else {
303-
Status status =
304-
Status.newBuilder()
305-
.setCode(Code.OUT_OF_RANGE.value())
306-
.setMessage(
307-
String.format("position = %d, readRange.read_offset = %d", position, begin))
308-
.build();
309-
BlobDescriptorStreamRead readWithNewId = state.assignNewReadId(id);
310-
// todo: record failure for read
311-
BidiReadObjectRequest requestWithNewReadId =
312-
BidiReadObjectRequest.newBuilder()
313-
.addReadRanges(readWithNewId.makeReadRange())
314-
.build();
315-
requestStream.send(requestWithNewReadId);
316-
// todo
317-
continue;
305+
try {
306+
read.recordError(
307+
ApiExceptionFactory.createException(
308+
String.format("position = %d, readRange.read_offset = %d", position, begin),
309+
null,
310+
GrpcStatusCode.of(Code.OUT_OF_RANGE),
311+
true));
312+
BlobDescriptorStreamRead readWithNewId = state.assignNewReadId(id);
313+
BidiReadObjectRequest requestWithNewReadId =
314+
BidiReadObjectRequest.newBuilder()
315+
.addReadRanges(readWithNewId.makeReadRange())
316+
.build();
317+
requestStream.send(requestWithNewReadId);
318+
} catch (Throwable e) {
319+
read.fail(e);
320+
}
318321
}
319322
if (d.getRangeEnd() && !read.getReadCursor().hasRemaining()) {
320323
final BlobDescriptorStreamRead finalRead = read;

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobDescriptorStreamRead.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,8 @@ public void close() throws IOException {
9999
}
100100
}
101101

102+
abstract void recordError(Throwable e);
103+
102104
static AccumulatingRead<byte[]> createByteArrayAccumulatingRead(
103105
long readId,
104106
ReadCursor readCursor,
@@ -115,8 +117,6 @@ static ZeroCopyByteStringAccumulatingRead createZeroCopyByteStringAccumulatingRe
115117
return new ZeroCopyByteStringAccumulatingRead(readId, readCursor, retryContext, complete);
116118
}
117119

118-
public abstract void recordError(Throwable e);
119-
120120
/** Base class of a read that will accumulate before completing by resolving a future */
121121
abstract static class AccumulatingRead<Result> extends BlobDescriptorStreamRead {
122122
protected final SettableApiFuture<Result> complete;

google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import com.google.api.gax.rpc.ApiExceptions;
4343
import com.google.api.gax.rpc.ClientStreamingCallable;
4444
import com.google.api.gax.rpc.NotFoundException;
45+
import com.google.api.gax.rpc.OutOfRangeException;
4546
import com.google.api.gax.rpc.StatusCode;
4647
import com.google.api.gax.rpc.UnaryCallable;
4748
import com.google.cloud.BaseService;
@@ -229,6 +230,7 @@ final class GrpcStorageImpl extends BaseService<StorageOptions>
229230
public boolean shouldRetry(Throwable previousThrowable, Object previousResponse) {
230231
// this is only retryable with read object range, not other requests
231232
return previousThrowable instanceof ChecksumMismatchException
233+
|| previousThrowable instanceof OutOfRangeException
232234
|| retryAlgorithmManager.idempotent().shouldRetry(previousThrowable, null);
233235
}
234236
});

google-cloud-storage/src/test/java/com/google/cloud/storage/ITBlobDescriptorFakeTest.java

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.google.api.gax.rpc.AbortedException;
2828
import com.google.api.gax.rpc.ApiExceptions;
2929
import com.google.api.gax.rpc.CancelledException;
30+
import com.google.api.gax.rpc.OutOfRangeException;
3031
import com.google.api.gax.rpc.UnavailableException;
3132
import com.google.cloud.storage.Crc32cValue.Crc32cLengthKnown;
3233
import com.google.cloud.storage.Hasher.ChecksumMismatchException;
@@ -628,7 +629,6 @@ public void onNext(BidiReadObjectRequest request) {
628629

629630
assertThat(cancelledException).hasCauseThat().isInstanceOf(ChecksumMismatchException.class);
630631
Throwable[] suppressed = cancelledException.getSuppressed();
631-
assertThat(suppressed).asList().hasSize(3);
632632
List<String> suppressedMessages =
633633
Arrays.stream(suppressed).map(Throwable::getMessage).collect(Collectors.toList());
634634
assertThat(suppressedMessages)
@@ -640,6 +640,70 @@ public void onNext(BidiReadObjectRequest request) {
640640
}
641641
}
642642

643+
@Test
644+
public void retrySettingsApplicable_objectRangeData_offset_notAligned_gt() throws Exception {
645+
646+
ChecksummedTestContent content2 = ChecksummedTestContent.of(ALL_OBJECT_BYTES, 11, 20);
647+
BidiReadObjectRequest req2 = read(1, 10, 20);
648+
BidiReadObjectResponse res2 =
649+
BidiReadObjectResponse.newBuilder()
650+
.addObjectDataRanges(
651+
ObjectRangeData.newBuilder()
652+
.setChecksummedData(content2.asChecksummedData())
653+
.setReadRange(getReadRange(1, 11, content2))
654+
.setRangeEnd(true)
655+
.build())
656+
.build();
657+
658+
ChecksummedTestContent content3 = ChecksummedTestContent.of(ALL_OBJECT_BYTES, 12, 20);
659+
BidiReadObjectRequest req3 = read(2, 10, 20);
660+
BidiReadObjectResponse res3 =
661+
BidiReadObjectResponse.newBuilder()
662+
.addObjectDataRanges(
663+
ObjectRangeData.newBuilder()
664+
.setChecksummedData(content3.asChecksummedData())
665+
.setReadRange(getReadRange(2, 12, content3))
666+
.setRangeEnd(true)
667+
.build())
668+
.build();
669+
670+
ImmutableMap<BidiReadObjectRequest, BidiReadObjectResponse> db =
671+
ImmutableMap.<BidiReadObjectRequest, BidiReadObjectResponse>builder()
672+
.put(REQ_OPEN, RES_OPEN)
673+
.put(req2, res2)
674+
.put(req3, res3)
675+
.buildOrThrow();
676+
677+
try (FakeServer fakeServer = FakeServer.of(FakeStorage.from(db));
678+
Storage storage =
679+
fakeServer
680+
.getGrpcStorageOptions()
681+
.toBuilder()
682+
.setRetrySettings(RetrySettings.newBuilder().setMaxAttempts(2).build())
683+
.build()
684+
.getService()) {
685+
686+
BlobId id = BlobId.of("b", "o");
687+
ApiFuture<BlobDescriptor> futureObjectDescriptor = storage.getBlobDescriptor(id);
688+
689+
try (BlobDescriptor bd = futureObjectDescriptor.get(5, TimeUnit.SECONDS)) {
690+
ApiFuture<byte[]> future = bd.readRangeAsBytes(RangeSpec.of(10L, 20L));
691+
692+
CancelledException cancelledException =
693+
assertThrows(
694+
CancelledException.class, () -> ApiExceptions.callAndTranslateApiException(future));
695+
696+
assertThat(cancelledException).hasCauseThat().isInstanceOf(OutOfRangeException.class);
697+
Throwable[] suppressed = cancelledException.getSuppressed();
698+
List<String> suppressedMessages =
699+
Arrays.stream(suppressed).map(Throwable::getMessage).collect(Collectors.toList());
700+
assertThat(suppressedMessages)
701+
.containsExactly(
702+
"position = 10, readRange.read_offset = 11", "Asynchronous task failed");
703+
}
704+
}
705+
}
706+
643707
private static void runTestAgainstFakeServer(
644708
FakeStorage fakeStorage, RangeSpec range, ChecksummedTestContent expected) throws Exception {
645709

0 commit comments

Comments
 (0)