Skip to content

Commit a7a8945

Browse files
committed
chore: add retry tracking to BlobDescriptorStream checksum mismatch
1 parent 26db9e3 commit a7a8945

File tree

8 files changed

+247
-62
lines changed

8 files changed

+247
-62
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobDescriptorImpl.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.google.cloud.storage.BlobDescriptor.ZeroCopySupport.DisposableByteString;
2626
import com.google.cloud.storage.BlobDescriptorStreamRead.AccumulatingRead;
2727
import com.google.cloud.storage.GrpcUtils.ZeroCopyBidiStreamingCallable;
28+
import com.google.cloud.storage.RetryContext.RetryContextProvider;
2829
import com.google.common.annotations.VisibleForTesting;
2930
import com.google.common.math.LongMath;
3031
import com.google.protobuf.ByteString;
@@ -40,11 +41,16 @@ final class BlobDescriptorImpl implements BlobDescriptor {
4041
private final BlobDescriptorStream stream;
4142
private final BlobDescriptorState state;
4243
private final BlobInfo info;
44+
private final RetryContextProvider retryContextProvider;
4345

44-
private BlobDescriptorImpl(BlobDescriptorStream stream, BlobDescriptorState state) {
46+
private BlobDescriptorImpl(
47+
BlobDescriptorStream stream,
48+
BlobDescriptorState state,
49+
RetryContextProvider retryContextProvider) {
4550
this.stream = stream;
4651
this.state = state;
4752
this.info = Conversions.grpc().blobInfo().decode(state.getMetadata());
53+
this.retryContextProvider = retryContextProvider;
4854
}
4955

5056
@Override
@@ -56,7 +62,8 @@ public ApiFuture<byte[]> readRangeAsBytes(RangeSpec range) {
5662
}
5763
SettableApiFuture<byte[]> future = SettableApiFuture.create();
5864
AccumulatingRead<byte[]> read =
59-
BlobDescriptorStreamRead.createByteArrayAccumulatingRead(readId, readCursor, future);
65+
BlobDescriptorStreamRead.createByteArrayAccumulatingRead(
66+
readId, readCursor, retryContextProvider.create(), future);
6067
BidiReadObjectRequest request =
6168
BidiReadObjectRequest.newBuilder().addReadRanges(read.makeReadRange()).build();
6269
state.putOutstandingRead(readId, read);
@@ -73,7 +80,7 @@ public ApiFuture<DisposableByteString> readRangeAsByteString(RangeSpec range) {
7380
SettableApiFuture<DisposableByteString> future = SettableApiFuture.create();
7481
AccumulatingRead<DisposableByteString> read =
7582
BlobDescriptorStreamRead.createZeroCopyByteStringAccumulatingRead(
76-
readId, readCursor, future);
83+
readId, readCursor, future, retryContextProvider.create());
7784
BidiReadObjectRequest request =
7885
BidiReadObjectRequest.newBuilder().addReadRanges(read.makeReadRange()).build();
7986
state.putOutstandingRead(readId, read);
@@ -115,13 +122,17 @@ static ApiFuture<BlobDescriptor> create(
115122
BidiReadObjectRequest openRequest,
116123
GrpcCallContext context,
117124
ZeroCopyBidiStreamingCallable<BidiReadObjectRequest, BidiReadObjectResponse> callable,
118-
ScheduledExecutorService executor) {
125+
ScheduledExecutorService executor,
126+
RetryContextProvider retryContextProvider) {
119127
BlobDescriptorState state = new BlobDescriptorState(openRequest);
120128

121129
BlobDescriptorStream stream = BlobDescriptorStream.create(executor, callable, context, state);
122130

123131
ApiFuture<BlobDescriptor> blobDescriptorFuture =
124-
ApiFutures.transform(stream, nowOpen -> new BlobDescriptorImpl(stream, state), executor);
132+
ApiFutures.transform(
133+
stream,
134+
nowOpen -> new BlobDescriptorImpl(stream, state, retryContextProvider),
135+
executor);
125136
stream.send(openRequest);
126137
return StorageException.coalesceAsync(blobDescriptorFuture);
127138
}

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobDescriptorStream.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -259,14 +259,19 @@ public void onResponse(BidiReadObjectResponse response) {
259259
// happen on a non-io thread
260260
Hasher.enabled().validate(Crc32cValue.of(crc32C), content);
261261
} catch (IOException e) {
262-
//noinspection resource
263-
BlobDescriptorStreamRead readWithNewId = state.assignNewReadId(id);
264-
// todo: record failure for read
265-
BidiReadObjectRequest requestWithNewReadId =
266-
BidiReadObjectRequest.newBuilder()
267-
.addReadRanges(readWithNewId.makeReadRange())
268-
.build();
269-
requestStream.send(requestWithNewReadId);
262+
try {
263+
read.recordError(e);
264+
265+
//noinspection resource
266+
BlobDescriptorStreamRead readWithNewId = state.assignNewReadId(id);
267+
BidiReadObjectRequest requestWithNewReadId =
268+
BidiReadObjectRequest.newBuilder()
269+
.addReadRanges(readWithNewId.makeReadRange())
270+
.build();
271+
requestStream.send(requestWithNewReadId);
272+
} catch (Throwable t) {
273+
read.fail(t);
274+
}
270275
continue;
271276
}
272277

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobDescriptorStreamRead.java

Lines changed: 92 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -37,17 +37,23 @@ abstract class BlobDescriptorStreamRead implements AutoCloseable, Closeable {
3737
protected final long readId;
3838
protected final ReadCursor readCursor;
3939
protected final List<ChildRef> childRefs;
40+
protected final RetryContext retryContext;
4041
protected boolean closed;
4142

42-
private BlobDescriptorStreamRead(long readId, ReadCursor readCursor) {
43-
this(readId, readCursor, Collections.synchronizedList(new ArrayList<>()), false);
43+
private BlobDescriptorStreamRead(long readId, ReadCursor readCursor, RetryContext retryContext) {
44+
this(readId, readCursor, Collections.synchronizedList(new ArrayList<>()), retryContext, false);
4445
}
4546

4647
private BlobDescriptorStreamRead(
47-
long readId, ReadCursor readCursor, List<ChildRef> childRefs, boolean closed) {
48+
long readId,
49+
ReadCursor readCursor,
50+
List<ChildRef> childRefs,
51+
RetryContext retryContext,
52+
boolean closed) {
4853
this.readId = readId;
4954
this.readCursor = readCursor;
5055
this.childRefs = childRefs;
56+
this.retryContext = retryContext;
5157
this.closed = closed;
5258
}
5359

@@ -61,7 +67,18 @@ ReadCursor getReadCursor() {
6167

6268
abstract void eof() throws IOException;
6369

64-
abstract void fail(Status status) throws IOException;
70+
final void fail(Status status) throws IOException {
71+
io.grpc.Status grpcStatus = io.grpc.Status.fromCodeValue(status.getCode());
72+
if (!status.getMessage().isEmpty()) {
73+
grpcStatus = grpcStatus.withDescription(status.getMessage());
74+
}
75+
StatusRuntimeException cause = grpcStatus.asRuntimeException();
76+
ApiException apiException =
77+
ApiExceptionFactory.createException(cause, GrpcStatusCode.of(grpcStatus.getCode()), false);
78+
fail(apiException);
79+
}
80+
81+
abstract void fail(Throwable t) throws IOException;
6582

6683
abstract BlobDescriptorStreamRead withNewReadId(long newReadId);
6784

@@ -76,38 +93,51 @@ final ReadRange makeReadRange() {
7693
@Override
7794
public void close() throws IOException {
7895
if (!closed) {
96+
retryContext.reset();
7997
closed = true;
8098
GrpcUtils.closeAll(childRefs);
8199
}
82100
}
83101

84102
static AccumulatingRead<byte[]> createByteArrayAccumulatingRead(
85-
long readId, ReadCursor readCursor, SettableApiFuture<byte[]> complete) {
86-
return new ByteArrayAccumulatingRead(readId, readCursor, complete);
103+
long readId,
104+
ReadCursor readCursor,
105+
RetryContext retryContext,
106+
SettableApiFuture<byte[]> complete) {
107+
return new ByteArrayAccumulatingRead(readId, readCursor, retryContext, complete);
87108
}
88109

89110
static ZeroCopyByteStringAccumulatingRead createZeroCopyByteStringAccumulatingRead(
90-
long readId, ReadCursor readCursor, SettableApiFuture<DisposableByteString> complete) {
91-
return new ZeroCopyByteStringAccumulatingRead(readId, readCursor, complete);
111+
long readId,
112+
ReadCursor readCursor,
113+
SettableApiFuture<DisposableByteString> complete,
114+
RetryContext retryContext) {
115+
return new ZeroCopyByteStringAccumulatingRead(readId, readCursor, retryContext, complete);
92116
}
93117

118+
public abstract void recordError(Throwable e);
119+
94120
/** Base class of a read that will accumulate before completing by resolving a future */
95121
abstract static class AccumulatingRead<Result> extends BlobDescriptorStreamRead {
96122
protected final SettableApiFuture<Result> complete;
97123

98124
private AccumulatingRead(
99-
long readId, ReadCursor readCursor, SettableApiFuture<Result> complete) {
100-
super(readId, readCursor);
125+
long readId,
126+
ReadCursor readCursor,
127+
RetryContext retryContext,
128+
SettableApiFuture<Result> complete) {
129+
super(readId, readCursor, retryContext);
101130
this.complete = complete;
102131
}
103132

104133
private AccumulatingRead(
105134
long readId,
106135
ReadCursor readCursor,
107136
List<ChildRef> childRefs,
137+
RetryContext retryContext,
108138
boolean closed,
109139
SettableApiFuture<Result> complete) {
110-
super(readId, readCursor, childRefs, closed);
140+
super(readId, readCursor, childRefs, retryContext, closed);
111141
this.complete = complete;
112142
}
113143

@@ -117,16 +147,17 @@ boolean acceptingBytes() {
117147
}
118148

119149
@Override
120-
void fail(Status status) throws IOException {
121-
io.grpc.Status grpcStatus = io.grpc.Status.fromCodeValue(status.getCode());
122-
if (!status.getMessage().isEmpty()) {
123-
grpcStatus = grpcStatus.withDescription(status.getMessage());
150+
void fail(Throwable t) throws IOException {
151+
try {
152+
complete.setException(t);
153+
} finally {
154+
close();
124155
}
125-
StatusRuntimeException cause = grpcStatus.asRuntimeException();
126-
ApiException apiException =
127-
ApiExceptionFactory.createException(
128-
cause, GrpcStatusCode.of(grpcStatus.getCode()), false);
129-
complete.setException(apiException);
156+
}
157+
158+
@Override
159+
public void recordError(Throwable e) {
160+
retryContext.recordError(e);
130161
}
131162
}
132163

@@ -135,41 +166,51 @@ void fail(Status status) throws IOException {
135166
* java.nio.channels.ReadableByteChannel})
136167
*/
137168
abstract static class StreamingRead extends BlobDescriptorStreamRead {
138-
private StreamingRead(long readId, long readOffset, long readLimit) {
139-
super(readId, new ReadCursor(readOffset, readOffset + readLimit));
169+
private StreamingRead(long readId, long readOffset, long readLimit, RetryContext retryContext) {
170+
super(readId, new ReadCursor(readOffset, readOffset + readLimit), retryContext);
140171
}
141172

142-
public StreamingRead(
143-
long readId, ReadCursor readCursor, List<ChildRef> childRefs, boolean closed) {
144-
super(readId, readCursor, childRefs, closed);
173+
private StreamingRead(
174+
long readId,
175+
ReadCursor readCursor,
176+
List<ChildRef> childRefs,
177+
RetryContext retryContext,
178+
boolean closed) {
179+
super(readId, readCursor, childRefs, retryContext, closed);
145180
}
146181
}
147182

148183
static final class ByteArrayAccumulatingRead extends AccumulatingRead<byte[]> {
149184

150185
private ByteArrayAccumulatingRead(
151-
long readId, ReadCursor readCursor, SettableApiFuture<byte[]> complete) {
152-
super(readId, readCursor, complete);
186+
long readId,
187+
ReadCursor readCursor,
188+
RetryContext retryContext,
189+
SettableApiFuture<byte[]> complete) {
190+
super(readId, readCursor, retryContext, complete);
153191
}
154192

155193
private ByteArrayAccumulatingRead(
156194
long readId,
157195
ReadCursor readCursor,
158196
List<ChildRef> childRefs,
197+
RetryContext retryContext,
159198
boolean closed,
160199
SettableApiFuture<byte[]> complete) {
161-
super(readId, readCursor, childRefs, closed, complete);
200+
super(readId, readCursor, childRefs, retryContext, closed, complete);
162201
}
163202

164203
@Override
165204
void accept(ChildRef childRef) throws IOException {
205+
retryContext.reset();
166206
int size = childRef.byteString().size();
167207
childRefs.add(childRef);
168208
readCursor.advance(size);
169209
}
170210

171211
@Override
172212
void eof() throws IOException {
213+
retryContext.reset();
173214
try {
174215
ByteString base = ByteString.empty();
175216
for (ChildRef ref : childRefs) {
@@ -183,7 +224,8 @@ void eof() throws IOException {
183224

184225
@Override
185226
ByteArrayAccumulatingRead withNewReadId(long newReadId) {
186-
return new ByteArrayAccumulatingRead(newReadId, readCursor, childRefs, closed, complete);
227+
return new ByteArrayAccumulatingRead(
228+
newReadId, readCursor, childRefs, retryContext, closed, complete);
187229
}
188230
}
189231

@@ -193,8 +235,23 @@ static final class ZeroCopyByteStringAccumulatingRead
193235
private volatile ByteString byteString;
194236

195237
private ZeroCopyByteStringAccumulatingRead(
196-
long readId, ReadCursor readCursor, SettableApiFuture<DisposableByteString> complete) {
197-
super(readId, readCursor, complete);
238+
long readId,
239+
ReadCursor readCursor,
240+
RetryContext retryContext,
241+
SettableApiFuture<DisposableByteString> complete) {
242+
super(readId, readCursor, retryContext, complete);
243+
}
244+
245+
public ZeroCopyByteStringAccumulatingRead(
246+
long readId,
247+
ReadCursor readCursor,
248+
List<ChildRef> childRefs,
249+
RetryContext retryContext,
250+
boolean closed,
251+
SettableApiFuture<DisposableByteString> complete,
252+
ByteString byteString) {
253+
super(readId, readCursor, childRefs, retryContext, closed, complete);
254+
this.byteString = byteString;
198255
}
199256

200257
@Override
@@ -204,13 +261,15 @@ public ByteString byteString() {
204261

205262
@Override
206263
void accept(ChildRef childRef) throws IOException {
264+
retryContext.reset();
207265
int size = childRef.byteString().size();
208266
childRefs.add(childRef);
209267
readCursor.advance(size);
210268
}
211269

212270
@Override
213271
void eof() throws IOException {
272+
retryContext.reset();
214273
ByteString base = ByteString.empty();
215274
for (ChildRef ref : childRefs) {
216275
base = base.concat(ref.byteString());
@@ -221,7 +280,8 @@ void eof() throws IOException {
221280

222281
@Override
223282
ZeroCopyByteStringAccumulatingRead withNewReadId(long newReadId) {
224-
return new ZeroCopyByteStringAccumulatingRead(newReadId, readCursor, complete);
283+
return new ZeroCopyByteStringAccumulatingRead(
284+
newReadId, readCursor, childRefs, retryContext, closed, complete, byteString);
225285
}
226286
}
227287
}

0 commit comments

Comments
 (0)