Skip to content

Commit aeb621a

Browse files
committed
chore: make RetryContext an interface to allow for decoration
1 parent 3d2520d commit aeb621a

File tree

3 files changed

+221
-181
lines changed

3 files changed

+221
-181
lines changed
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
/*
2+
* Copyright 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.storage;
18+
19+
import com.google.api.gax.retrying.ResultRetryAlgorithm;
20+
import com.google.cloud.storage.Backoff.BackoffDuration;
21+
import com.google.cloud.storage.Backoff.BackoffResult;
22+
import com.google.cloud.storage.Backoff.BackoffResults;
23+
import com.google.cloud.storage.Backoff.Jitterer;
24+
import com.google.cloud.storage.Retrying.RetryingDependencies;
25+
import com.google.common.annotations.VisibleForTesting;
26+
import java.time.Duration;
27+
import java.util.LinkedList;
28+
import java.util.List;
29+
import java.util.concurrent.ScheduledExecutorService;
30+
import java.util.concurrent.ScheduledFuture;
31+
import java.util.concurrent.TimeUnit;
32+
import java.util.concurrent.locks.ReentrantLock;
33+
import org.checkerframework.checker.nullness.qual.Nullable;
34+
35+
@SuppressWarnings("SizeReplaceableByIsEmpty") // allow elimination of a method call and a negation
36+
final class DefaultRetryContext implements RetryContext {
37+
private final ScheduledExecutorService scheduledExecutorService;
38+
private final RetryingDependencies retryingDependencies;
39+
private final ResultRetryAlgorithm<?> algorithm;
40+
private final Backoff backoff;
41+
private final ReentrantLock lock;
42+
43+
private List<Throwable> failures;
44+
private long lastRecordedErrorNs;
45+
@Nullable private BackoffResult lastBackoffResult;
46+
@Nullable private ScheduledFuture<?> pendingBackoff;
47+
48+
DefaultRetryContext(
49+
ScheduledExecutorService scheduledExecutorService,
50+
RetryingDependencies retryingDependencies,
51+
ResultRetryAlgorithm<?> algorithm,
52+
Jitterer jitterer) {
53+
this.scheduledExecutorService = scheduledExecutorService;
54+
this.retryingDependencies = retryingDependencies;
55+
this.algorithm = algorithm;
56+
this.backoff =
57+
Backoff.from(retryingDependencies.getRetrySettings()).setJitterer(jitterer).build();
58+
this.lock = new ReentrantLock();
59+
this.failures = new LinkedList<>();
60+
this.lastRecordedErrorNs = retryingDependencies.getClock().nanoTime();
61+
this.lastBackoffResult = null;
62+
this.pendingBackoff = null;
63+
}
64+
65+
@Override
66+
public boolean inBackoff() {
67+
lock.lock();
68+
boolean b = pendingBackoff != null;
69+
try {
70+
return b;
71+
} finally {
72+
lock.unlock();
73+
}
74+
}
75+
76+
@Override
77+
public void reset() {
78+
lock.lock();
79+
try {
80+
if (failures.size() > 0) {
81+
failures = new LinkedList<>();
82+
}
83+
lastRecordedErrorNs = retryingDependencies.getClock().nanoTime();
84+
clearPendingBackoff();
85+
} finally {
86+
lock.unlock();
87+
}
88+
}
89+
90+
@VisibleForTesting
91+
void awaitBackoffComplete() {
92+
while (inBackoff()) {
93+
Thread.yield();
94+
}
95+
}
96+
97+
@Override
98+
public <T extends Throwable> void recordError(T t, OnSuccess onSuccess, OnFailure<T> onFailure) {
99+
lock.lock();
100+
try {
101+
long now = retryingDependencies.getClock().nanoTime();
102+
Duration elapsed = Duration.ofNanos(now - lastRecordedErrorNs);
103+
if (pendingBackoff != null && pendingBackoff.isDone()) {
104+
pendingBackoff = null;
105+
lastBackoffResult = null;
106+
} else if (pendingBackoff != null) {
107+
pendingBackoff.cancel(true);
108+
backoff.backoffInterrupted(elapsed);
109+
String message =
110+
String.format(
111+
"Previous backoff interrupted by this error (previousBackoff: %s, elapsed: %s)",
112+
lastBackoffResult != null ? lastBackoffResult.errorString() : null, elapsed);
113+
t.addSuppressed(BackoffComment.of(message));
114+
}
115+
int failureCount = failures.size() + 1 /* include t in the count*/;
116+
int maxAttempts = retryingDependencies.getRetrySettings().getMaxAttempts();
117+
if (maxAttempts <= 0) {
118+
maxAttempts = Integer.MAX_VALUE;
119+
}
120+
boolean shouldRetry = algorithm.shouldRetry(t, null);
121+
Duration cumulativeBackoff = backoff.getCumulativeBackoff();
122+
BackoffResult nextBackoff = backoff.nextBackoff(elapsed);
123+
String msgPrefix = null;
124+
if (shouldRetry && failureCount >= maxAttempts) {
125+
msgPrefix = "Operation failed to complete within attempt budget";
126+
} else if (nextBackoff == BackoffResults.EXHAUSTED) {
127+
msgPrefix = "Operation failed to complete within backoff budget";
128+
} else if (!shouldRetry) {
129+
msgPrefix = "Unretryable error";
130+
}
131+
132+
if (msgPrefix == null) {
133+
t.addSuppressed(BackoffComment.fromResult(nextBackoff));
134+
failures.add(t);
135+
136+
BackoffDuration backoffDuration = (BackoffDuration) nextBackoff;
137+
138+
lastBackoffResult = nextBackoff;
139+
pendingBackoff =
140+
scheduledExecutorService.schedule(
141+
() -> {
142+
try {
143+
onSuccess.onSuccess();
144+
} finally {
145+
clearPendingBackoff();
146+
}
147+
},
148+
backoffDuration.getDuration().toNanos(),
149+
TimeUnit.NANOSECONDS);
150+
} else {
151+
String msg =
152+
String.format(
153+
"%s (attempts: %d%s, elapsed: %s, nextBackoff: %s%s)%s",
154+
msgPrefix,
155+
failureCount,
156+
maxAttempts == Integer.MAX_VALUE
157+
? ""
158+
: String.format(", maxAttempts: %d", maxAttempts),
159+
cumulativeBackoff,
160+
nextBackoff.errorString(),
161+
Durations.eq(backoff.getTimeout(), Durations.EFFECTIVE_INFINITY)
162+
? ""
163+
: ", timeout: " + backoff.getTimeout(),
164+
failures.isEmpty() ? "" : " previous failures follow in order of occurrence");
165+
t.addSuppressed(new RetryBudgetExhaustedComment(msg));
166+
for (Throwable failure : failures) {
167+
t.addSuppressed(failure);
168+
}
169+
onFailure.onFailure(t);
170+
}
171+
} finally {
172+
lock.unlock();
173+
}
174+
}
175+
176+
private void clearPendingBackoff() {
177+
lock.lock();
178+
try {
179+
if (pendingBackoff != null) {
180+
if (!pendingBackoff.isDone()) {
181+
pendingBackoff.cancel(true);
182+
}
183+
pendingBackoff = null;
184+
}
185+
if (lastBackoffResult != null) {
186+
lastBackoffResult = null;
187+
}
188+
} finally {
189+
lock.unlock();
190+
}
191+
}
192+
}

0 commit comments

Comments
 (0)