Skip to content

Commit b1d127d

Browse files
werktcopybara-github
authored andcommitted
Done operations must be reexecuted
Any operation with done == true as reported by the server is not expected to change its result on subsequent waitExecution calls. To properly retry, this action must be reexecuted, if it was truly transient, to achieve a definitive result. Submit a transient status for retry, disallow special behaviors for NOT_FOUND as covered by done observation, and consider method type when handling operation streams. Closes #18943. PiperOrigin-RevId: 548680656 Change-Id: Ib2c9887ead1fbd3de97761db6e8b4077783ad03c
1 parent b6c986e commit b1d127d

4 files changed

Lines changed: 48 additions & 34 deletions

File tree

src/main/java/com/google/devtools/build/lib/remote/ExperimentalGrpcRemoteExecutor.java

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ ExecuteResponse execute() throws IOException {
179179

180180
try {
181181
Iterator<Operation> operationStream = executeFunction.apply(request);
182-
return handleOperationStream(operationStream);
182+
return handleOperationStream(operationStream, /* waitExecution= */ false);
183183
} catch (Throwable e) {
184184
// If lastOperation is not null, we know the execution request is accepted by the server. In
185185
// this case, we will fallback to WaitExecution() loop when the stream is broken.
@@ -199,34 +199,43 @@ ExecuteResponse waitExecution() throws IOException {
199199
WaitExecutionRequest.newBuilder().setName(lastOperation.getName()).build();
200200
try {
201201
Iterator<Operation> operationStream = waitExecutionFunction.apply(request);
202-
return handleOperationStream(operationStream);
202+
return handleOperationStream(operationStream, /* waitExecution= */ true);
203+
} catch (StatusRuntimeException e) {
204+
throw new IOException(e);
203205
} catch (Throwable e) {
204-
// A NOT_FOUND error means Operation was lost on the server, retry Execute().
205-
//
206-
// However, we only retry Execute() if executeBackoff should retry. Also increase the retry
207-
// counter at the same time (done by nextDelayMillis()).
208-
if (e instanceof StatusRuntimeException) {
209-
StatusRuntimeException sre = (StatusRuntimeException) e;
210-
if (sre.getStatus().getCode() == Code.NOT_FOUND
211-
&& executeBackoff.nextDelayMillis(sre) >= 0) {
212-
lastOperation = null;
213-
return null;
214-
}
215-
}
206+
lastOperation = null;
216207
throw new IOException(e);
217208
}
218209
}
219210

220211
/** Process a stream of operations from Execute() or WaitExecution(). */
221212
@Nullable
222-
ExecuteResponse handleOperationStream(Iterator<Operation> operationStream) throws IOException {
213+
ExecuteResponse handleOperationStream(
214+
Iterator<Operation> operationStream, boolean waitExecution) throws IOException {
223215
try {
224216
while (operationStream.hasNext()) {
225217
Operation operation = operationStream.next();
226-
ExecuteResponse response = extractResponseOrThrowIfError(operation);
227218

228-
// At this point, we successfully received a response that is not an error.
229-
lastOperation = operation;
219+
// Either done or should be repeated
220+
lastOperation = operation.getDone() ? null : operation;
221+
222+
ExecuteResponse response;
223+
try {
224+
response = extractResponseOrThrowIfError(operation);
225+
} catch (StatusRuntimeException e) {
226+
// An operation error means Operation has been terminally completed, retry Execute().
227+
//
228+
// However, we only retry Execute() if executeBackoff should retry. Also increase the
229+
// retry
230+
// counter at the same time (done by nextDelayMillis()).
231+
if (waitExecution
232+
&& (retrier.isRetriable(e) || e.getStatus().getCode() == Code.NOT_FOUND)
233+
&& executeBackoff.nextDelayMillis(e) >= 0) {
234+
lastOperation = null;
235+
return null;
236+
}
237+
throw e;
238+
}
230239

231240
// We don't want to reset executeBackoff since if there is an error:
232241
// 1. If happened before we received a first response, we want to ensure the retry

src/test/java/com/google/devtools/build/lib/remote/ExperimentalGrpcRemoteExecutorTest.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import com.google.devtools.build.lib.remote.common.OperationObserver;
2727
import com.google.devtools.build.lib.remote.common.RemoteExecutionClient;
2828
import com.google.devtools.build.lib.remote.util.TestUtils;
29-
import com.google.rpc.Code;
3029
import io.grpc.Status;
3130
import java.io.IOException;
3231
import java.util.concurrent.Executors;
@@ -90,7 +89,9 @@ public void executeRemotely_executeAndRetryWait_forever() throws Exception {
9089
public void executeRemotely_executeAndRetryWait_failForConsecutiveErrors() {
9190
executionService.whenExecute(DUMMY_REQUEST).thenAck().finish();
9291
for (int i = 0; i < MAX_RETRY_ATTEMPTS * 2; ++i) {
93-
executionService.whenWaitExecution(DUMMY_REQUEST).thenError(Code.UNAVAILABLE);
92+
executionService
93+
.whenWaitExecution(DUMMY_REQUEST)
94+
.thenError(Status.UNAVAILABLE.asRuntimeException());
9495
}
9596

9697
assertThrows(
@@ -150,7 +151,10 @@ public void executeRemotely_responseWithoutResult_shouldNotCrash() {
150151
public void executeRemotely_retryWaitExecutionWhenUnauthenticated()
151152
throws IOException, InterruptedException {
152153
executionService.whenExecute(DUMMY_REQUEST).thenAck().finish();
153-
executionService.whenWaitExecution(DUMMY_REQUEST).thenAck().thenError(Code.UNAUTHENTICATED);
154+
executionService
155+
.whenWaitExecution(DUMMY_REQUEST)
156+
.thenAck()
157+
.thenError(Status.UNAUTHENTICATED.asRuntimeException());
154158
executionService.whenWaitExecution(DUMMY_REQUEST).thenAck().thenDone(DUMMY_RESPONSE);
155159

156160
ExecuteResponse response =

src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutorTest.java

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -100,17 +100,4 @@ public void executeRemotely_retryWaitExecutionWhenUnauthenticated()
100100
assertThat(executionService.getWaitTimes()).isEqualTo(1);
101101
assertThat(response).isEqualTo(DUMMY_RESPONSE);
102102
}
103-
104-
@Test
105-
public void executeRemotely_retryExecuteOnNoResult() throws IOException, InterruptedException {
106-
executionService.whenExecute(DUMMY_REQUEST).thenAck().thenError(Code.UNAVAILABLE);
107-
executionService.whenExecute(DUMMY_REQUEST).thenAck().thenDone(DUMMY_RESPONSE);
108-
109-
ExecuteResponse response =
110-
executor.executeRemotely(context, DUMMY_REQUEST, OperationObserver.NO_OP);
111-
112-
assertThat(executionService.getExecTimes()).isEqualTo(2);
113-
assertThat(executionService.getWaitTimes()).isEqualTo(0);
114-
assertThat(response).isEqualTo(DUMMY_RESPONSE);
115-
}
116103
}

src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutorTestBase.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,4 +313,18 @@ public void executeRemotely_notifyObserver() throws IOException, InterruptedExce
313313
FakeExecutionService.ackOperation(DUMMY_REQUEST),
314314
FakeExecutionService.doneOperation(DUMMY_REQUEST, DUMMY_RESPONSE));
315315
}
316+
317+
@Test
318+
public void executeRemotely_retryExecuteOnNoResultDoneOperation()
319+
throws IOException, InterruptedException {
320+
executionService.whenExecute(DUMMY_REQUEST).thenAck().thenError(Code.UNAVAILABLE);
321+
executionService.whenExecute(DUMMY_REQUEST).thenAck().thenDone(DUMMY_RESPONSE);
322+
323+
ExecuteResponse response =
324+
executor.executeRemotely(context, DUMMY_REQUEST, OperationObserver.NO_OP);
325+
326+
assertThat(executionService.getExecTimes()).isEqualTo(2);
327+
assertThat(executionService.getWaitTimes()).isEqualTo(0);
328+
assertThat(response).isEqualTo(DUMMY_RESPONSE);
329+
}
316330
}

0 commit comments

Comments
 (0)