Skip to content

Commit 98dbb02

Browse files
committed
chore: refactor RetryContext to accept an OnSuccess and OnFailure callback along with the Throwable to recordFailure
This is partly cleanup and partly pre-work. The cleanup part is to get rid of the extra exception throwing/catching The pre-work part is in preparation for when retries will actually backoff, the OnSuccess can be executed after the backoff interval.
1 parent cbc22f2 commit 98dbb02

File tree

4 files changed

+120
-85
lines changed

4 files changed

+120
-85
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobDescriptorStream.java

Lines changed: 31 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,14 @@
2020
import com.google.api.core.SettableApiFuture;
2121
import com.google.api.gax.grpc.GrpcCallContext;
2222
import com.google.api.gax.grpc.GrpcStatusCode;
23+
import com.google.api.gax.rpc.ApiException;
2324
import com.google.api.gax.rpc.ApiExceptionFactory;
2425
import com.google.api.gax.rpc.ClientStream;
2526
import com.google.api.gax.rpc.ResponseObserver;
2627
import com.google.api.gax.rpc.StreamController;
2728
import com.google.cloud.storage.GrpcUtils.ZeroCopyBidiStreamingCallable;
2829
import com.google.cloud.storage.ResponseContentLifecycleHandle.ChildRef;
30+
import com.google.cloud.storage.RetryContext.OnSuccess;
2931
import com.google.common.base.Preconditions;
3032
import com.google.protobuf.ByteString;
3133
import com.google.rpc.Status;
@@ -261,19 +263,7 @@ public void onResponse(BidiReadObjectResponse response) {
261263
// happen on a non-io thread
262264
Hasher.enabled().validate(Crc32cValue.of(crc32C), content);
263265
} catch (IOException e) {
264-
try {
265-
read.recordError(e);
266-
267-
//noinspection resource
268-
BlobDescriptorStreamRead readWithNewId = state.assignNewReadId(id);
269-
BidiReadObjectRequest requestWithNewReadId =
270-
BidiReadObjectRequest.newBuilder()
271-
.addReadRanges(readWithNewId.makeReadRange())
272-
.build();
273-
requestStream.send(requestWithNewReadId);
274-
} catch (Throwable t) {
275-
read.fail(t);
276-
}
266+
read.recordError(e, restartReadFromCurrentOffset(id), read::unsafeFail);
277267
continue;
278268
}
279269

@@ -307,39 +297,32 @@ public void onResponse(BidiReadObjectResponse response) {
307297
.getContent()
308298
.substring(skip));
309299
read.accept(childRef);
310-
//noinspection resource
311-
read = state.assignNewReadId(id);
312-
if (read.getReadCursor().hasRemaining()) {
313-
BidiReadObjectRequest requestWithNewReadId =
314-
BidiReadObjectRequest.newBuilder().addReadRanges(read.makeReadRange()).build();
315-
requestStream.send(requestWithNewReadId);
316-
}
300+
ApiException apiException =
301+
ApiExceptionFactory.createException(
302+
String.format("position = %d, readRange.read_offset = %d", position, begin),
303+
null,
304+
GrpcStatusCode.of(Code.OUT_OF_RANGE),
305+
true);
306+
read.recordError(apiException, restartReadFromCurrentOffset(id), read::unsafeFail);
307+
continue;
317308
} else {
318-
try {
319-
read.recordError(
320-
ApiExceptionFactory.createException(
321-
String.format("position = %d, readRange.read_offset = %d", position, begin),
322-
null,
323-
GrpcStatusCode.of(Code.OUT_OF_RANGE),
324-
true));
325-
BlobDescriptorStreamRead readWithNewId = state.assignNewReadId(id);
326-
BidiReadObjectRequest requestWithNewReadId =
327-
BidiReadObjectRequest.newBuilder()
328-
.addReadRanges(readWithNewId.makeReadRange())
329-
.build();
330-
requestStream.send(requestWithNewReadId);
331-
} catch (Throwable e) {
332-
read.fail(e);
333-
}
309+
ApiException apiException =
310+
ApiExceptionFactory.createException(
311+
String.format("position = %d, readRange.read_offset = %d", position, begin),
312+
null,
313+
GrpcStatusCode.of(Code.OUT_OF_RANGE),
314+
true);
315+
read.recordError(apiException, restartReadFromCurrentOffset(id), read::unsafeFail);
316+
continue;
334317
}
318+
335319
if (d.getRangeEnd() && !read.getReadCursor().hasRemaining()) {
336-
final BlobDescriptorStreamRead finalRead = read;
337320
// invoke eof on exec, the resolving future could have a downstream callback
338321
// that we don't want to block this grpc thread
339322
executor.execute(
340323
StorageException.liftToRunnable(
341324
() -> {
342-
finalRead.eof();
325+
read.eof();
343326
// don't remove the outstanding read until the future has been resolved
344327
state.removeOutstandingRead(id);
345328
}));
@@ -379,6 +362,16 @@ public void onError(Throwable t) {
379362
reset();
380363
}
381364

365+
private OnSuccess restartReadFromCurrentOffset(long id) {
366+
return () -> {
367+
//noinspection resource
368+
BlobDescriptorStreamRead readWithNewId = state.assignNewReadId(id);
369+
BidiReadObjectRequest requestWithNewReadId =
370+
BidiReadObjectRequest.newBuilder().addReadRanges(readWithNewId.makeReadRange()).build();
371+
requestStream.send(requestWithNewReadId);
372+
};
373+
}
374+
382375
@Override
383376
public void onComplete() {}
384377
}

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobDescriptorStreamRead.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import com.google.api.gax.rpc.ApiExceptionFactory;
2323
import com.google.cloud.storage.BlobDescriptor.ZeroCopySupport.DisposableByteString;
2424
import com.google.cloud.storage.ResponseContentLifecycleHandle.ChildRef;
25+
import com.google.cloud.storage.RetryContext.OnFailure;
26+
import com.google.cloud.storage.RetryContext.OnSuccess;
2527
import com.google.protobuf.ByteString;
2628
import com.google.rpc.Status;
2729
import com.google.storage.v2.ReadRange;
@@ -78,6 +80,15 @@ final void fail(Status status) throws IOException {
7880
fail(apiException);
7981
}
8082

83+
final void unsafeFail(Throwable t) {
84+
try {
85+
fail(t);
86+
} catch (IOException e) {
87+
// todo: better exception than this
88+
throw new RuntimeException(e);
89+
}
90+
}
91+
8192
abstract void fail(Throwable t) throws IOException;
8293

8394
abstract BlobDescriptorStreamRead withNewReadId(long newReadId);
@@ -99,7 +110,7 @@ public void close() throws IOException {
99110
}
100111
}
101112

102-
abstract void recordError(Throwable e);
113+
abstract void recordError(Throwable t, OnSuccess onSuccess, OnFailure onFailure);
103114

104115
static AccumulatingRead<byte[]> createByteArrayAccumulatingRead(
105116
long readId,
@@ -156,8 +167,8 @@ void fail(Throwable t) throws IOException {
156167
}
157168

158169
@Override
159-
public void recordError(Throwable e) {
160-
retryContext.recordError(e);
170+
void recordError(Throwable t, OnSuccess onSuccess, OnFailure onFailure) {
171+
retryContext.recordError(t, onSuccess, onFailure);
161172
}
162173
}
163174

google-cloud-storage/src/main/java/com/google/cloud/storage/RetryContext.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,10 @@ void reset() {
4242
failures = new LinkedList<>();
4343
}
4444

45-
public void recordError(Throwable e) {
46-
int failureCount = failures.size() + 1 /* include e */;
45+
public void recordError(Throwable t, OnSuccess onSuccess, OnFailure onFailure) {
46+
int failureCount = failures.size() + 1 /* include t in the count*/;
4747
int maxAttempts = retryingDependencies.getRetrySettings().getMaxAttempts();
48-
boolean shouldRetry = algorithm.shouldRetry(e, null);
48+
boolean shouldRetry = algorithm.shouldRetry(t, null);
4949
String msgPrefix = null;
5050
if (shouldRetry && failureCount >= maxAttempts) {
5151
msgPrefix = "Operation failed to complete within retry limit";
@@ -54,15 +54,16 @@ public void recordError(Throwable e) {
5454
}
5555

5656
if (msgPrefix == null) {
57-
failures.add(e);
57+
failures.add(t);
58+
onSuccess.onSuccess();
5859
} else {
5960
String msg =
6061
String.format("%s (attempts: %d, maxAttempts: %d)", msgPrefix, failureCount, maxAttempts);
61-
ApiException cancelled = ApiExceptionFactory.createException(msg, e, CANCELLED, false);
62+
ApiException cancelled = ApiExceptionFactory.createException(msg, t, CANCELLED, false);
6263
for (Throwable failure : failures) {
6364
cancelled.addSuppressed(failure);
6465
}
65-
throw cancelled;
66+
onFailure.onFailure(cancelled);
6667
}
6768
}
6869

@@ -83,7 +84,16 @@ static RetryContextProvider providerFrom(RetryingDependencies deps, ResultRetryA
8384

8485
@FunctionalInterface
8586
interface RetryContextProvider {
86-
8787
RetryContext create();
8888
}
89+
90+
@FunctionalInterface
91+
interface OnSuccess {
92+
void onSuccess();
93+
}
94+
95+
@FunctionalInterface
96+
interface OnFailure {
97+
void onFailure(Throwable t);
98+
}
8999
}

google-cloud-storage/src/test/java/com/google/cloud/storage/RetryContextTest.java

Lines changed: 58 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package com.google.cloud.storage;
1818

1919
import static com.google.common.truth.Truth.assertThat;
20-
import static org.junit.Assert.assertThrows;
2120

2221
import com.google.api.core.ApiClock;
2322
import com.google.api.core.NanoClock;
@@ -26,13 +25,15 @@
2625
import com.google.api.gax.retrying.RetrySettings;
2726
import com.google.api.gax.rpc.ApiException;
2827
import com.google.api.gax.rpc.ApiExceptionFactory;
29-
import com.google.api.gax.rpc.CancelledException;
3028
import com.google.api.gax.rpc.ResourceExhaustedException;
29+
import com.google.cloud.storage.RetryContext.OnFailure;
30+
import com.google.cloud.storage.RetryContext.OnSuccess;
3131
import com.google.cloud.storage.Retrying.RetryingDependencies;
3232
import io.grpc.Status.Code;
3333
import org.junit.Test;
3434

3535
public final class RetryContextTest {
36+
private static final OnSuccess NOOP = () -> {};
3637

3738
private static final Throwable T1 = apiException(Code.UNAVAILABLE, "{unavailable}");
3839
private static final Throwable T2 = apiException(Code.INTERNAL, "{internal}");
@@ -42,43 +43,39 @@ public final class RetryContextTest {
4243
public void retriable_cancelledException_when_maxAttemptBudget_consumed() {
4344
RetryContext ctx = RetryContext.of(maxAttempts(1), Retrying.alwaysRetry());
4445

45-
CancelledException cancelled =
46-
assertThrows(CancelledException.class, () -> ctx.recordError(T1));
47-
48-
assertThat(cancelled).hasCauseThat().isEqualTo(T1);
46+
ctx.recordError(T1, failOnSuccess(), actual -> assertThat(actual).hasCauseThat().isEqualTo(T1));
4947
}
5048

5149
@Test
5250
public void retriable_maxAttemptBudget_still_available() {
5351
RetryContext ctx = RetryContext.of(maxAttempts(2), Retrying.alwaysRetry());
5452

55-
ctx.recordError(T1);
53+
ctx.recordError(T1, NOOP, failOnFailure());
5654
}
5755

5856
@Test
5957
public void
6058
retriable_cancelledException_when_maxAttemptBudget_multipleAttempts_previousErrorsIncludedAsSuppressed() {
6159
RetryContext ctx = RetryContext.of(maxAttempts(3), Retrying.alwaysRetry());
6260

63-
ctx.recordError(T1);
64-
ctx.recordError(T2);
65-
66-
CancelledException cancelled =
67-
assertThrows(CancelledException.class, () -> ctx.recordError(T3));
68-
69-
assertThat(cancelled).hasCauseThat().isEqualTo(T3);
70-
Throwable[] suppressed = cancelled.getSuppressed();
71-
assertThat(suppressed).asList().containsExactly(T1, T2);
61+
ctx.recordError(T1, NOOP, failOnFailure());
62+
ctx.recordError(T2, NOOP, failOnFailure());
63+
64+
ctx.recordError(
65+
T3,
66+
failOnSuccess(),
67+
actual -> {
68+
assertThat(actual).hasCauseThat().isEqualTo(T3);
69+
Throwable[] suppressed = actual.getSuppressed();
70+
assertThat(suppressed).asList().containsExactly(T1, T2);
71+
});
7272
}
7373

7474
@Test
7575
public void nonRetriable_cancelledException_regardlessOfAttemptBudget() {
7676
RetryContext ctx = RetryContext.of(maxAttempts(3), Retrying.neverRetry());
7777

78-
CancelledException cancelled =
79-
assertThrows(CancelledException.class, () -> ctx.recordError(T1));
80-
81-
assertThat(cancelled).hasCauseThat().isEqualTo(T1);
78+
ctx.recordError(T1, failOnSuccess(), actual -> assertThat(actual).hasCauseThat().isEqualTo(T1));
8279
}
8380

8481
@Test
@@ -94,15 +91,17 @@ public boolean shouldRetry(Throwable previousThrowable, Object previousResponse)
9491
}
9592
});
9693

97-
ctx.recordError(T1);
98-
ctx.recordError(T2);
99-
100-
CancelledException cancelled =
101-
assertThrows(CancelledException.class, () -> ctx.recordError(T3));
102-
103-
assertThat(cancelled).hasCauseThat().isEqualTo(T3);
104-
Throwable[] suppressed = cancelled.getSuppressed();
105-
assertThat(suppressed).asList().containsExactly(T1, T2);
94+
ctx.recordError(T1, NOOP, failOnFailure());
95+
ctx.recordError(T2, NOOP, failOnFailure());
96+
97+
ctx.recordError(
98+
T3,
99+
failOnSuccess(),
100+
actual -> {
101+
assertThat(actual).hasCauseThat().isEqualTo(T3);
102+
Throwable[] suppressed = actual.getSuppressed();
103+
assertThat(suppressed).asList().containsExactly(T1, T2);
104+
});
106105
}
107106

108107
@Test
@@ -117,16 +116,18 @@ public boolean shouldRetry(Throwable previousThrowable, Object previousResponse)
117116
}
118117
});
119118

120-
ctx.recordError(T1);
121-
ctx.recordError(T2);
119+
ctx.recordError(T1, NOOP, failOnFailure());
120+
ctx.recordError(T2, NOOP, failOnFailure());
122121
ctx.reset();
123122

124-
CancelledException cancelled =
125-
assertThrows(CancelledException.class, () -> ctx.recordError(T3));
126-
127-
assertThat(cancelled).hasCauseThat().isEqualTo(T3);
128-
Throwable[] suppressed = cancelled.getSuppressed();
129-
assertThat(suppressed).asList().isEmpty();
123+
ctx.recordError(
124+
T3,
125+
failOnSuccess(),
126+
actual -> {
127+
assertThat(actual).hasCauseThat().isEqualTo(T3);
128+
Throwable[] suppressed = actual.getSuppressed();
129+
assertThat(suppressed).asList().isEmpty();
130+
});
130131
}
131132

132133
private static ApiException apiException(Code code, String message) {
@@ -139,6 +140,20 @@ private static MaxAttemptRetryingDependencies maxAttempts(int maxAttempts) {
139140
NanoClock.getDefaultClock());
140141
}
141142

143+
private static OnFailure failOnFailure() {
144+
InvocationTracer invocationTracer = new InvocationTracer("Unexpected onFailure invocation");
145+
return t -> {
146+
throw invocationTracer;
147+
};
148+
}
149+
150+
private static OnSuccess failOnSuccess() {
151+
InvocationTracer invocationTracer = new InvocationTracer("Unexpected onSuccess invocation");
152+
return () -> {
153+
throw invocationTracer;
154+
};
155+
}
156+
142157
private static final class MaxAttemptRetryingDependencies implements RetryingDependencies {
143158
private final RetrySettings settings;
144159
private final ApiClock clock;
@@ -158,4 +173,10 @@ public ApiClock getClock() {
158173
return clock;
159174
}
160175
}
176+
177+
private static final class InvocationTracer extends RuntimeException {
178+
private InvocationTracer(String message) {
179+
super(message);
180+
}
181+
}
161182
}

0 commit comments

Comments
 (0)