Skip to content

Commit 3616097

Browse files
committed
chore: ensure BlobDescriptor opening can be retried
1 parent 6a80994 commit 3616097

File tree

2 files changed

+87
-2
lines changed

2 files changed

+87
-2
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobDescriptorStream.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import com.google.storage.v2.ReadRange;
4545
import com.google.storage.v2.ReadRangeError;
4646
import io.grpc.Status.Code;
47+
import io.grpc.StatusRuntimeException;
4748
import java.io.IOException;
4849
import java.nio.channels.AsynchronousCloseException;
4950
import java.util.List;
@@ -441,15 +442,28 @@ public void onResponse(BidiReadObjectResponse response) {
441442
@Override
442443
public void onError(Throwable t) {
443444
delegate.onError(t);
444-
blobDescriptorResolveFuture.setException(t);
445445
openSignal.setException(t);
446446
closeSignal.setException(t);
447447
}
448448

449449
@Override
450450
public void onComplete() {
451451
delegate.onComplete();
452-
blobDescriptorResolveFuture.set(null);
452+
if (state.getMetadata() == null) {
453+
StatusRuntimeException cause =
454+
Code.UNAVAILABLE
455+
.toStatus()
456+
.withDescription("onComplete without prior onNext")
457+
.asRuntimeException();
458+
ApiException apiException =
459+
ApiExceptionFactory.createException(cause, GrpcStatusCode.of(Code.UNAVAILABLE), false);
460+
StorageException storageException =
461+
new StorageException(0, cause.getMessage(), apiException);
462+
streamRetryContext.recordError(
463+
storageException,
464+
BlobDescriptorStream.this::restart,
465+
blobDescriptorResolveFuture::setException);
466+
}
453467
openSignal.set(null);
454468
closeSignal.set(null);
455469
}
@@ -489,6 +503,7 @@ public void onError(Throwable t) {
489503
// bubble all the way up to the invoker we'll be able to see it in a bug report.
490504
t.addSuppressed(new MaxRedirectsExceededException(maxRedirectsAllowed, redirectCount));
491505
delegate.onError(t);
506+
blobDescriptorResolveFuture.setException(t);
492507
return;
493508
}
494509
if (error.hasReadHandle()) {

google-cloud-storage/src/test/java/com/google/cloud/storage/ITBlobDescriptorFakeTest.java

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.google.cloud.storage;
1818

1919
import static com.google.cloud.storage.ByteSizeConstants._2MiB;
20+
import static com.google.cloud.storage.TestUtils.apiException;
2021
import static com.google.cloud.storage.TestUtils.assertAll;
2122
import static com.google.cloud.storage.TestUtils.getChecksummedData;
2223
import static com.google.cloud.storage.TestUtils.xxd;
@@ -28,6 +29,7 @@
2829
import com.google.api.core.ApiFutures;
2930
import com.google.api.gax.retrying.RetrySettings;
3031
import com.google.api.gax.rpc.AbortedException;
32+
import com.google.api.gax.rpc.ApiException;
3133
import com.google.api.gax.rpc.DataLossException;
3234
import com.google.api.gax.rpc.OutOfRangeException;
3335
import com.google.api.gax.rpc.UnavailableException;
@@ -1067,6 +1069,74 @@ public void streaming() throws Exception {
10671069
}
10681070
}
10691071

1072+
@Test
1073+
public void retryableErrorWhileOpeningIsRetried() throws Exception {
1074+
AtomicInteger reqCounter = new AtomicInteger(0);
1075+
FakeStorage fake =
1076+
FakeStorage.of(
1077+
ImmutableMap.of(
1078+
REQ_OPEN,
1079+
respond -> {
1080+
int i = reqCounter.incrementAndGet();
1081+
if (i <= 1) {
1082+
ApiException apiException =
1083+
apiException(Code.UNAVAILABLE, String.format("{unavailable %d}", i));
1084+
respond.onError(apiException);
1085+
} else {
1086+
respond.onNext(RES_OPEN);
1087+
}
1088+
}));
1089+
1090+
try (FakeServer fakeServer = FakeServer.of(fake);
1091+
Storage storage =
1092+
fakeServer
1093+
.getGrpcStorageOptions()
1094+
.toBuilder()
1095+
.setRetrySettings(RetrySettings.newBuilder().setMaxAttempts(3).build())
1096+
.build()
1097+
.getService()) {
1098+
1099+
BlobId id = BlobId.of("b", "o");
1100+
ApiFuture<BlobDescriptor> futureBlobDescriptor = storage.getBlobDescriptor(id);
1101+
try (BlobDescriptor bd = futureBlobDescriptor.get(20, TimeUnit.SECONDS)) {
1102+
assertThat(bd).isNotNull();
1103+
}
1104+
}
1105+
}
1106+
1107+
@Test
1108+
public void onCompleteWithoutAValue() throws Exception {
1109+
// I'm not sure if this is something that can actually happen in practice, but is being here
1110+
// to ensure it's at least accounted for, rather than a null pointer exception or something else
1111+
// equally cryptic.
1112+
FakeStorage fake = FakeStorage.of(ImmutableMap.of(REQ_OPEN, StreamObserver::onCompleted));
1113+
1114+
try (FakeServer fakeServer = FakeServer.of(fake);
1115+
Storage storage =
1116+
fakeServer
1117+
.getGrpcStorageOptions()
1118+
.toBuilder()
1119+
.setRetrySettings(RetrySettings.newBuilder().setMaxAttempts(3).build())
1120+
.build()
1121+
.getService()) {
1122+
1123+
BlobId id = BlobId.of("b", "o");
1124+
ApiFuture<BlobDescriptor> futureBlobDescriptor = storage.getBlobDescriptor(id);
1125+
ExecutionException ee =
1126+
assertThrows(
1127+
ExecutionException.class, () -> futureBlobDescriptor.get(20, TimeUnit.SECONDS));
1128+
assertAll(
1129+
() -> assertThat(ee).hasCauseThat().isInstanceOf(StorageException.class),
1130+
() ->
1131+
assertThat(ee).hasCauseThat().hasCauseThat().isInstanceOf(UnavailableException.class),
1132+
() -> assertThat(((StorageException) ee.getCause()).getCode()).isEqualTo(0),
1133+
() -> {
1134+
String messages = TestUtils.messagesToText(ee.getCause());
1135+
assertThat(messages).contains("Unretryable error");
1136+
});
1137+
}
1138+
}
1139+
10701140
private static void runTestAgainstFakeServer(
10711141
FakeStorage fakeStorage, RangeSpec range, ChecksummedTestContent expected) throws Exception {
10721142

0 commit comments

Comments
 (0)