Skip to content

Commit 72491e7

Browse files
committed
chore: update failure handling to be two-phase because it can cross threads
Also update failure handlers to ensure a failed read is removed from stream state.
1 parent 2968790 commit 72491e7

File tree

7 files changed

+100
-34
lines changed

7 files changed

+100
-34
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobDescriptorImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
final class BlobDescriptorImpl implements BlobDescriptor {
4040

4141
private final BlobDescriptorStream stream;
42-
private final BlobDescriptorState state;
42+
@VisibleForTesting final BlobDescriptorState state;
4343
private final BlobInfo info;
4444
private final RetryContextProvider retryContextProvider;
4545

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobDescriptorState.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import static com.google.common.base.Preconditions.checkState;
2020

21+
import com.google.cloud.storage.RetryContext.OnFailure;
2122
import com.google.common.collect.ImmutableList;
2223
import com.google.storage.v2.BidiReadHandle;
2324
import com.google.storage.v2.BidiReadObjectRequest;
@@ -111,6 +112,13 @@ void removeOutstandingRead(long key) {
111112
}
112113
}
113114

115+
<T extends Throwable> OnFailure<T> removeOutstandingReadOnFailure(long key, OnFailure<T> onFail) {
116+
return t -> {
117+
removeOutstandingRead(key);
118+
onFail.onFailure(t);
119+
};
120+
}
121+
114122
void setRoutingToken(String routingToken) {
115123
this.routingToken.set(routingToken);
116124
}

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobDescriptorStream.java

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,10 @@ public void onResponse(BidiReadObjectResponse response) {
266266
// happen on a non-io thread
267267
Hasher.enabled().validateUnchecked(Crc32cValue.of(crc32C), content);
268268
} catch (UncheckedChecksumMismatchException e) {
269-
read.recordError(e, restartReadFromCurrentOffset(id), read::unsafeFail);
269+
read.recordError(
270+
e,
271+
restartReadFromCurrentOffset(id),
272+
state.removeOutstandingReadOnFailure(id, read::fail));
270273
continue;
271274
}
272275

@@ -306,7 +309,10 @@ public void onResponse(BidiReadObjectResponse response) {
306309
null,
307310
GrpcStatusCode.of(Code.OUT_OF_RANGE),
308311
true);
309-
read.recordError(apiException, restartReadFromCurrentOffset(id), read::unsafeFail);
312+
read.recordError(
313+
apiException,
314+
restartReadFromCurrentOffset(id),
315+
state.removeOutstandingReadOnFailure(id, read::fail));
310316
continue;
311317
} else {
312318
ApiException apiException =
@@ -315,7 +321,10 @@ public void onResponse(BidiReadObjectResponse response) {
315321
null,
316322
GrpcStatusCode.of(Code.OUT_OF_RANGE),
317323
true);
318-
read.recordError(apiException, restartReadFromCurrentOffset(id), read::unsafeFail);
324+
read.recordError(
325+
apiException,
326+
restartReadFromCurrentOffset(id),
327+
state.removeOutstandingReadOnFailure(id, read::fail));
319328
continue;
320329
}
321330

@@ -339,6 +348,7 @@ public void onResponse(BidiReadObjectResponse response) {
339348

340349
@Override
341350
public void onError(Throwable t) {
351+
reset();
342352
BidiReadObjectError error = GrpcUtils.getBidiReadObjectError(t);
343353
if (error == null) {
344354
return;
@@ -355,14 +365,16 @@ public void onError(Throwable t) {
355365
if (read == null) {
356366
continue;
357367
}
368+
// mark read as failed, but don't resolve its future now. Schedule the delivery of the
369+
// failure in executor to ensure any downstream future doesn't block this IO thread.
370+
read.preFail();
358371
executor.execute(
359372
StorageException.liftToRunnable(
360-
() -> {
361-
read.fail(status);
362-
state.removeOutstandingRead(id);
363-
}));
373+
() ->
374+
state
375+
.removeOutstandingReadOnFailure(id, read::fail)
376+
.onFailure(GrpcUtils.statusToApiException(status))));
364377
}
365-
reset();
366378
}
367379

368380
private OnSuccess restartReadFromCurrentOffset(long id) {

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobDescriptorStreamRead.java

Lines changed: 3 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,12 @@
1717
package com.google.cloud.storage;
1818

1919
import com.google.api.core.SettableApiFuture;
20-
import com.google.api.gax.grpc.GrpcStatusCode;
21-
import com.google.api.gax.rpc.ApiException;
22-
import com.google.api.gax.rpc.ApiExceptionFactory;
2320
import com.google.cloud.storage.BlobDescriptor.ZeroCopySupport.DisposableByteString;
2421
import com.google.cloud.storage.ResponseContentLifecycleHandle.ChildRef;
2522
import com.google.cloud.storage.RetryContext.OnFailure;
2623
import com.google.cloud.storage.RetryContext.OnSuccess;
2724
import com.google.protobuf.ByteString;
28-
import com.google.rpc.Status;
2925
import com.google.storage.v2.ReadRange;
30-
import io.grpc.StatusRuntimeException;
3126
import java.io.Closeable;
3227
import java.io.IOException;
3328
import java.util.ArrayList;
@@ -71,27 +66,11 @@ ReadCursor getReadCursor() {
7166

7267
abstract void eof() throws IOException;
7368

74-
final void fail(Status status) throws IOException {
75-
io.grpc.Status grpcStatus = io.grpc.Status.fromCodeValue(status.getCode());
76-
if (!status.getMessage().isEmpty()) {
77-
grpcStatus = grpcStatus.withDescription(status.getMessage());
78-
}
79-
StatusRuntimeException cause = grpcStatus.asRuntimeException();
80-
ApiException apiException =
81-
ApiExceptionFactory.createException(cause, GrpcStatusCode.of(grpcStatus.getCode()), false);
82-
fail(apiException);
83-
}
84-
85-
final void unsafeFail(Throwable t) {
86-
try {
87-
fail(t);
88-
} catch (IOException e) {
89-
// todo: better exception than this
90-
throw new RuntimeException(e);
91-
}
69+
final void preFail() {
70+
tombstoned = true;
9271
}
9372

94-
abstract void fail(Throwable t) throws IOException;
73+
abstract void fail(Throwable t);
9574

9675
abstract BlobDescriptorStreamRead withNewReadId(long newReadId);
9776

google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcUtils.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717
package com.google.cloud.storage;
1818

1919
import com.google.api.gax.grpc.GrpcCallContext;
20+
import com.google.api.gax.grpc.GrpcStatusCode;
2021
import com.google.api.gax.rpc.ApiCallContext;
2122
import com.google.api.gax.rpc.ApiException;
23+
import com.google.api.gax.rpc.ApiExceptionFactory;
2224
import com.google.api.gax.rpc.ApiStreamObserver;
2325
import com.google.api.gax.rpc.BidiStream;
2426
import com.google.api.gax.rpc.BidiStreamObserver;
@@ -35,8 +37,10 @@
3537
import com.google.common.collect.ImmutableList;
3638
import com.google.common.collect.ImmutableMap;
3739
import com.google.protobuf.Message;
40+
import com.google.rpc.Status;
3841
import com.google.storage.v2.BidiReadObjectError;
3942
import com.google.storage.v2.BidiReadObjectRedirectedError;
43+
import io.grpc.StatusRuntimeException;
4044
import java.io.Closeable;
4145
import java.io.IOException;
4246
import java.util.Collection;
@@ -114,6 +118,16 @@ static BidiReadObjectError getBidiReadObjectError(Throwable t) {
114118
return findFirstPackedAny(t, BidiReadObjectError.class);
115119
}
116120

121+
static ApiException statusToApiException(Status status) {
122+
io.grpc.Status grpcStatus = io.grpc.Status.fromCodeValue(status.getCode());
123+
if (!status.getMessage().isEmpty()) {
124+
grpcStatus = grpcStatus.withDescription(status.getMessage());
125+
}
126+
StatusRuntimeException cause = grpcStatus.asRuntimeException();
127+
return ApiExceptionFactory.createException(
128+
cause, GrpcStatusCode.of(grpcStatus.getCode()), false);
129+
}
130+
117131
@Nullable
118132
private static <M extends Message> M findFirstPackedAny(Throwable t, Class<M> clazz) {
119133
if (t instanceof ApiException) {

google-cloud-storage/src/test/java/com/google/cloud/storage/ITBlobDescriptorFakeTest.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static com.google.cloud.storage.TestUtils.xxd;
2222
import static com.google.common.truth.Truth.assertThat;
2323
import static org.junit.Assert.assertThrows;
24+
import static org.junit.Assert.fail;
2425

2526
import com.google.api.core.ApiFuture;
2627
import com.google.api.gax.retrying.RetrySettings;
@@ -739,6 +740,58 @@ public void moreBytesReturnedThanRequested_onlyForwardsRequestedBytes() throws E
739740
runTestAgainstFakeServer(FakeStorage.from(db), RangeSpec.of(10, 20), expected);
740741
}
741742

743+
@Test
744+
public void validateReadRemovedFromStateWhenFailed() throws Exception {
745+
746+
BidiReadObjectRequest req2 = read(1, 10, 20);
747+
BidiReadObjectResponse res2 =
748+
BidiReadObjectResponse.newBuilder()
749+
.addObjectDataRanges(
750+
ObjectRangeData.newBuilder()
751+
.setReadRange(req2.getReadRangesList().get(0))
752+
.setChecksummedData(
753+
ChecksummedData.newBuilder()
754+
.setContent(ByteString.copyFrom(new byte[] {'A'}))
755+
// explicitly send a bad checksum to induce failure
756+
.setCrc32C(1)
757+
.build())
758+
.build())
759+
.build();
760+
761+
FakeStorage fake = FakeStorage.from(ImmutableMap.of(REQ_OPEN, RES_OPEN, req2, res2));
762+
763+
try (FakeServer fakeServer = FakeServer.of(fake);
764+
Storage storage =
765+
fakeServer
766+
.getGrpcStorageOptions()
767+
.toBuilder()
768+
.setRetrySettings(RetrySettings.newBuilder().setMaxAttempts(1).build())
769+
.build()
770+
.getService()) {
771+
772+
BlobId id = BlobId.of("b", "o");
773+
ApiFuture<BlobDescriptor> futureObjectDescriptor = storage.getBlobDescriptor(id);
774+
775+
try (BlobDescriptor bd = futureObjectDescriptor.get(5, TimeUnit.SECONDS)) {
776+
BlobDescriptorImpl bdi = null;
777+
if (bd instanceof BlobDescriptorImpl) {
778+
bdi = (BlobDescriptorImpl) bd;
779+
} else {
780+
fail("unable to locate state for validation");
781+
}
782+
783+
ApiFuture<byte[]> future = bd.readRangeAsBytes(RangeSpec.of(10, 20));
784+
ExecutionException ee =
785+
assertThrows(ExecutionException.class, () -> future.get(5, TimeUnit.SECONDS));
786+
787+
assertThat(ee).hasCauseThat().isInstanceOf(UncheckedChecksumMismatchException.class);
788+
789+
BlobDescriptorStreamRead outstandingRead = bdi.state.getOutstandingRead(1L);
790+
assertThat(outstandingRead).isNull();
791+
}
792+
}
793+
}
794+
742795
private static void runTestAgainstFakeServer(
743796
FakeStorage fakeStorage, RangeSpec range, ChecksummedTestContent expected) throws Exception {
744797

google-cloud-storage/src/test/java/com/google/cloud/storage/it/runner/registry/BucketInfoShim.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,6 @@ public void start() {
5252

5353
@Override
5454
public void stop() {
55-
BucketCleaner.doCleanup(bucketInfo.getName(), s, ctrl);
55+
BucketCleaner.doCleanup(bucketInfo.getName(), s /*, ctrl*/);
5656
}
5757
}

0 commit comments

Comments
 (0)