Skip to content

Commit ba70fd8

Browse files
committed
chore: integrate Backoff into RetryContext
Update RetryContex to incorporate Backoff. If a backoff is warranted, the next attempt will be scheduled on the provided ScheduledExecutorService after the backoff duration, so we are not blocking any thread with sleeping. Exhausted retry budget errors have been updated to include backoff/duration information.
1 parent 95d46a5 commit ba70fd8

File tree

5 files changed

+410
-76
lines changed

5 files changed

+410
-76
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,7 @@ final class GrpcStorageImpl extends BaseService<StorageOptions>
225225
.getExecutor();
226226
retryContextProvider =
227227
RetryContext.providerFrom(
228+
executor,
228229
getOptions(),
229230
new BasicResultRetryAlgorithm<Object>() {
230231
@Override

google-cloud-storage/src/main/java/com/google/cloud/storage/RetryContext.java

Lines changed: 178 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -17,69 +17,198 @@
1717
package com.google.cloud.storage;
1818

1919
import com.google.api.gax.retrying.ResultRetryAlgorithm;
20+
import com.google.cloud.storage.Backoff.BackoffDuration;
21+
import com.google.cloud.storage.Backoff.BackoffResult;
22+
import com.google.cloud.storage.Backoff.BackoffResults;
23+
import com.google.cloud.storage.Backoff.Jitterer;
2024
import com.google.cloud.storage.Retrying.RetryingDependencies;
25+
import com.google.common.annotations.VisibleForTesting;
26+
import java.time.Duration;
2127
import java.util.LinkedList;
2228
import java.util.List;
29+
import java.util.concurrent.Executors;
30+
import java.util.concurrent.ScheduledExecutorService;
31+
import java.util.concurrent.ScheduledFuture;
32+
import java.util.concurrent.TimeUnit;
33+
import java.util.concurrent.locks.ReentrantLock;
34+
import org.checkerframework.checker.nullness.qual.Nullable;
2335

36+
@SuppressWarnings("SizeReplaceableByIsEmpty") // allow elimination of a method call and a negation
2437
final class RetryContext {
25-
38+
private final ScheduledExecutorService scheduledExecutorService;
2639
private final RetryingDependencies retryingDependencies;
2740
private final ResultRetryAlgorithm<?> algorithm;
41+
private final Backoff backoff;
42+
private final ReentrantLock lock;
43+
2844
private List<Throwable> failures;
45+
private long lastRecordedErrorNs;
46+
@Nullable private BackoffResult lastBackoffResult;
47+
@Nullable private ScheduledFuture<?> pendingBackoff;
2948

3049
private RetryContext(
31-
RetryingDependencies retryingDependencies, ResultRetryAlgorithm<?> algorithm) {
50+
ScheduledExecutorService scheduledExecutorService,
51+
RetryingDependencies retryingDependencies,
52+
ResultRetryAlgorithm<?> algorithm,
53+
Jitterer jitterer) {
54+
this.scheduledExecutorService = scheduledExecutorService;
3255
this.retryingDependencies = retryingDependencies;
3356
this.algorithm = algorithm;
57+
this.backoff =
58+
Backoff.from(retryingDependencies.getRetrySettings()).setJitterer(jitterer).build();
59+
this.lock = new ReentrantLock();
60+
this.failures = new LinkedList<>();
61+
this.lastRecordedErrorNs = retryingDependencies.getClock().nanoTime();
62+
this.lastBackoffResult = null;
63+
this.pendingBackoff = null;
64+
}
65+
66+
boolean inBackoff() {
67+
lock.lock();
68+
boolean b = pendingBackoff != null;
69+
try {
70+
return b;
71+
} finally {
72+
lock.unlock();
73+
}
3474
}
3575

3676
void reset() {
37-
failures = new LinkedList<>();
77+
lock.lock();
78+
try {
79+
if (failures.size() > 0) {
80+
failures = new LinkedList<>();
81+
}
82+
lastRecordedErrorNs = retryingDependencies.getClock().nanoTime();
83+
clearPendingBackoff();
84+
} finally {
85+
lock.unlock();
86+
}
3887
}
3988

40-
public <T extends Throwable> void recordError(T t, OnSuccess onSuccess, OnFailure<T> onFailure) {
41-
int failureCount = failures.size() + 1 /* include t in the count*/;
42-
int maxAttempts = retryingDependencies.getRetrySettings().getMaxAttempts();
43-
boolean shouldRetry = algorithm.shouldRetry(t, null);
44-
String msgPrefix = null;
45-
if (shouldRetry && failureCount >= maxAttempts) {
46-
msgPrefix = "Operation failed to complete within retry limit";
47-
} else if (!shouldRetry) {
48-
msgPrefix = "Unretryable error";
89+
@VisibleForTesting
90+
void awaitBackoffComplete() {
91+
while (inBackoff()) {
92+
Thread.yield();
93+
}
94+
}
95+
96+
<T extends Throwable> void recordError(T t, OnSuccess onSuccess, OnFailure<T> onFailure) {
97+
lock.lock();
98+
try {
99+
long now = retryingDependencies.getClock().nanoTime();
100+
Duration elapsed = Duration.ofNanos(now - lastRecordedErrorNs);
101+
if (pendingBackoff != null && pendingBackoff.isDone()) {
102+
pendingBackoff = null;
103+
lastBackoffResult = null;
104+
} else if (pendingBackoff != null) {
105+
pendingBackoff.cancel(true);
106+
backoff.backoffInterrupted(elapsed);
107+
String message =
108+
String.format(
109+
"Previous backoff interrupted by this error (previousBackoff: %s, elapsed: %s)",
110+
lastBackoffResult != null ? lastBackoffResult.errorString() : null, elapsed);
111+
t.addSuppressed(BackoffComment.of(message));
112+
}
113+
int failureCount = failures.size() + 1 /* include t in the count*/;
114+
int maxAttempts = retryingDependencies.getRetrySettings().getMaxAttempts();
115+
if (maxAttempts <= 0) {
116+
maxAttempts = Integer.MAX_VALUE;
117+
}
118+
boolean shouldRetry = algorithm.shouldRetry(t, null);
119+
Duration elapsedOverall = backoff.getCumulativeBackoff().plus(elapsed);
120+
BackoffResult nextBackoff = backoff.nextBackoff(elapsed);
121+
String msgPrefix = null;
122+
if (shouldRetry && failureCount >= maxAttempts) {
123+
msgPrefix = "Operation failed to complete within attempt budget";
124+
} else if (nextBackoff == BackoffResults.EXHAUSTED) {
125+
msgPrefix = "Operation failed to complete within backoff budget";
126+
} else if (!shouldRetry) {
127+
msgPrefix = "Unretryable error";
128+
}
129+
130+
if (msgPrefix == null) {
131+
t.addSuppressed(BackoffComment.fromResult(nextBackoff));
132+
failures.add(t);
133+
134+
BackoffDuration backoffDuration = (BackoffDuration) nextBackoff;
135+
136+
lastBackoffResult = nextBackoff;
137+
pendingBackoff =
138+
scheduledExecutorService.schedule(
139+
() -> {
140+
try {
141+
onSuccess.onSuccess();
142+
} finally {
143+
clearPendingBackoff();
144+
}
145+
},
146+
backoffDuration.getDuration().toNanos(),
147+
TimeUnit.NANOSECONDS);
148+
} else {
149+
String msg =
150+
String.format(
151+
"%s (attempts: %d%s, elapsed: %s, nextBackoff: %s%s)%s",
152+
msgPrefix,
153+
failureCount,
154+
maxAttempts == Integer.MAX_VALUE
155+
? ""
156+
: String.format(", maxAttempts: %d", maxAttempts),
157+
elapsedOverall,
158+
nextBackoff.errorString(),
159+
Durations.eq(backoff.getTimeout(), Durations.EFFECTIVE_INFINITY)
160+
? ""
161+
: ", timeout: " + backoff.getTimeout(),
162+
failures.isEmpty() ? "" : " previous failures follow in order of occurrence");
163+
t.addSuppressed(new RetryBudgetExhaustedComment(msg));
164+
for (Throwable failure : failures) {
165+
t.addSuppressed(failure);
166+
}
167+
onFailure.onFailure(t);
168+
}
169+
} finally {
170+
lock.unlock();
49171
}
172+
}
50173

51-
if (msgPrefix == null) {
52-
failures.add(t);
53-
onSuccess.onSuccess();
54-
} else {
55-
String msg =
56-
String.format(
57-
"%s (attempts: %d, maxAttempts: %d)%s",
58-
msgPrefix,
59-
failureCount,
60-
maxAttempts,
61-
failures.isEmpty() ? "" : " previous failures follow in order of occurrence");
62-
t.addSuppressed(new RetryBudgetExhaustedComment(msg));
63-
for (Throwable failure : failures) {
64-
t.addSuppressed(failure);
174+
private void clearPendingBackoff() {
175+
lock.lock();
176+
try {
177+
if (pendingBackoff != null) {
178+
if (!pendingBackoff.isDone()) {
179+
pendingBackoff.cancel(true);
180+
}
181+
pendingBackoff = null;
182+
}
183+
if (lastBackoffResult != null) {
184+
lastBackoffResult = null;
65185
}
66-
onFailure.onFailure(t);
186+
} finally {
187+
lock.unlock();
67188
}
68189
}
69190

70191
static RetryContext of(
71-
RetryingDependencies retryingDependencies, ResultRetryAlgorithm<?> algorithm) {
72-
RetryContext retryContext = new RetryContext(retryingDependencies, algorithm);
73-
retryContext.reset();
74-
return retryContext;
192+
ScheduledExecutorService scheduledExecutorService,
193+
RetryingDependencies retryingDependencies,
194+
ResultRetryAlgorithm<?> algorithm,
195+
Jitterer jitterer) {
196+
return new RetryContext(scheduledExecutorService, retryingDependencies, algorithm, jitterer);
75197
}
76198

77199
static RetryContext neverRetry() {
78-
return new RetryContext(RetryingDependencies.attemptOnce(), Retrying.neverRetry());
200+
return new RetryContext(
201+
Executors.newSingleThreadScheduledExecutor(),
202+
RetryingDependencies.attemptOnce(),
203+
Retrying.neverRetry(),
204+
Jitterer.threadLocalRandom());
79205
}
80206

81-
static RetryContextProvider providerFrom(RetryingDependencies deps, ResultRetryAlgorithm<?> alg) {
82-
return () -> RetryContext.of(deps, alg);
207+
static RetryContextProvider providerFrom(
208+
ScheduledExecutorService scheduledExecutorService,
209+
RetryingDependencies deps,
210+
ResultRetryAlgorithm<?> alg) {
211+
return () -> RetryContext.of(scheduledExecutorService, deps, alg, Jitterer.threadLocalRandom());
83212
}
84213

85214
@FunctionalInterface
@@ -110,4 +239,19 @@ private RetryBudgetExhaustedComment(String comment) {
110239
super(comment, /*cause=*/ null, /*enableSuppression=*/ true, /*writableStackTrace=*/ false);
111240
}
112241
}
242+
243+
private static final class BackoffComment extends Throwable {
244+
private BackoffComment(String message) {
245+
super(message, /*cause=*/ null, /*enableSuppression=*/ true, /*writableStackTrace=*/ false);
246+
}
247+
248+
private static BackoffComment fromResult(BackoffResult result) {
249+
return new BackoffComment(
250+
String.format("backing off %s before next attempt", result.errorString()));
251+
}
252+
253+
private static BackoffComment of(String message) {
254+
return new BackoffComment(message);
255+
}
256+
}
113257
}

google-cloud-storage/src/main/java/com/google/cloud/storage/Retrying.java

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.google.cloud.RetryHelper.RetryHelperException;
2828
import com.google.cloud.storage.Conversions.Decoder;
2929
import com.google.cloud.storage.spi.v1.HttpRpcContext;
30+
import com.google.common.base.MoreObjects;
3031
import com.google.common.collect.ImmutableList;
3132
import com.google.common.collect.ImmutableMap;
3233
import java.util.UUID;
@@ -162,17 +163,40 @@ interface RetryingDependencies {
162163
ApiClock getClock();
163164

164165
static RetryingDependencies attemptOnce() {
165-
return new RetryingDependencies() {
166-
@Override
167-
public RetrySettings getRetrySettings() {
168-
return RetrySettings.newBuilder().setMaxAttempts(1).build();
169-
}
170-
171-
@Override
172-
public ApiClock getClock() {
173-
return NanoClock.getDefaultClock();
174-
}
175-
};
166+
return RetryingDependencies.simple(
167+
NanoClock.getDefaultClock(), RetrySettings.newBuilder().setMaxAttempts(1).build());
168+
}
169+
170+
static RetryingDependencies simple(ApiClock clock, RetrySettings retrySettings) {
171+
return new SimpleRetryingDependencies(clock, retrySettings);
172+
}
173+
}
174+
175+
private static final class SimpleRetryingDependencies implements RetryingDependencies {
176+
private final ApiClock clock;
177+
private final RetrySettings retrySettings;
178+
179+
private SimpleRetryingDependencies(ApiClock clock, RetrySettings retrySettings) {
180+
this.retrySettings = retrySettings;
181+
this.clock = clock;
182+
}
183+
184+
@Override
185+
public ApiClock getClock() {
186+
return clock;
187+
}
188+
189+
@Override
190+
public RetrySettings getRetrySettings() {
191+
return retrySettings;
192+
}
193+
194+
@Override
195+
public String toString() {
196+
return MoreObjects.toStringHelper(this)
197+
.add("clock", clock)
198+
.add("retrySettings", retrySettings)
199+
.toString();
176200
}
177201
}
178202
}

0 commit comments

Comments
 (0)