Skip to content

Commit 280ef69

Browse files
coeuvrecopybara-github
authored andcommitted
Remote: Prefetch input files into a temporary path first.
When building with build without bytes and dynamic execution, we need prefetch input files for local actions. Sometimes, multiple local actions could share the same input files, so there could be a case where multiple call sites share the same download instance. If the local action is cancelled (due to remote branch wins), the download it requested should also be cancelled only if that download is not shared with other local action (or all the releated local actions are cancelled). Before this change, the inputs are written to their final destination directly. This is fine if we can make sure no race or bug in the prefetcher. However, this is not true: #15010. The root cause is, when cancelling the downloads, sometimes, the partially downloaded files on the disk are not deleted. By making the prefetcher download input to a temporary path first, we can: 1. Mitigate the race: only the final move step will potentially cause the race condition. 2. Provide a way to observe the race: if these is no race, all temporary files should be either moved or deleted. But when running with this change, many temporary files exist. Working towards #12454. PiperOrigin-RevId: 447473693
1 parent 74fff55 commit 280ef69

5 files changed

Lines changed: 149 additions & 44 deletions

File tree

src/main/java/com/google/devtools/build/lib/remote/AbstractActionInputPrefetcher.java

Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,14 @@
3333
import com.google.devtools.build.lib.profiler.SilentCloseable;
3434
import com.google.devtools.build.lib.remote.util.AsyncTaskCache;
3535
import com.google.devtools.build.lib.remote.util.RxUtils.TransferResult;
36+
import com.google.devtools.build.lib.remote.util.TempPathGenerator;
37+
import com.google.devtools.build.lib.vfs.FileSystemUtils;
3638
import com.google.devtools.build.lib.vfs.Path;
3739
import io.reactivex.rxjava3.core.Completable;
3840
import io.reactivex.rxjava3.core.Flowable;
3941
import io.reactivex.rxjava3.functions.Function;
4042
import java.io.IOException;
43+
import java.util.concurrent.atomic.AtomicBoolean;
4144
import javax.annotation.Nullable;
4245

4346
/**
@@ -48,11 +51,13 @@ public abstract class AbstractActionInputPrefetcher implements ActionInputPrefet
4851
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
4952

5053
private final AsyncTaskCache.NoResult<Path> downloadCache = AsyncTaskCache.NoResult.create();
54+
private final TempPathGenerator tempPathGenerator;
5155

5256
protected final Path execRoot;
5357

54-
protected AbstractActionInputPrefetcher(Path execRoot) {
58+
protected AbstractActionInputPrefetcher(Path execRoot, TempPathGenerator tempPathGenerator) {
5559
this.execRoot = execRoot;
60+
this.tempPathGenerator = tempPathGenerator;
5661
}
5762

5863
protected abstract boolean shouldDownloadInput(
@@ -113,25 +118,41 @@ private Completable prefetchInput(MetadataProvider metadataProvider, ActionInput
113118
return downloadFileIfNot(path, (p) -> downloadInput(p, input, metadata));
114119
}
115120

116-
/** Downloads file into the {@code path} with given downloader. */
121+
/**
122+
* Downloads file into the {@code path} with given downloader.
123+
*
124+
* <p>The file will be written into a temporary file and moved to the final destination after the
125+
* download finished.
126+
*/
117127
protected Completable downloadFileIfNot(
118128
Path path, Function<Path, ListenableFuture<Void>> downloader) {
129+
AtomicBoolean completed = new AtomicBoolean(false);
119130
Completable download =
120-
toCompletable(() -> downloader.apply(path), directExecutor())
121-
.doOnComplete(() -> finalizeDownload(path))
122-
.doOnError(error -> deletePartialDownload(path))
123-
.doOnDispose(() -> deletePartialDownload(path));
131+
Completable.using(
132+
tempPathGenerator::generateTempPath,
133+
tempPath ->
134+
toCompletable(() -> downloader.apply(tempPath), directExecutor())
135+
.doOnComplete(
136+
() -> {
137+
finalizeDownload(tempPath, path);
138+
completed.set(true);
139+
}),
140+
tempPath -> {
141+
if (!completed.get()) {
142+
deletePartialDownload(tempPath);
143+
}
144+
},
145+
// Set eager=false here because we want cleanup the download *after* upstream is
146+
// disposed.
147+
/* eager= */ false);
124148
return downloadCache.executeIfNot(path, download);
125149
}
126150

127-
private void finalizeDownload(Path path) {
128-
try {
129-
// The permission of output file is changed to 0555 after action execution. We manually change
130-
// the permission here for the downloaded file to keep this behaviour consistent.
131-
path.chmod(0555);
132-
} catch (IOException e) {
133-
logger.atWarning().withCause(e).log("Failed to chmod 555 on %s", path);
134-
}
151+
private void finalizeDownload(Path tmpPath, Path path) throws IOException {
152+
// The permission of output file is changed to 0555 after action execution. We manually change
153+
// the permission here for the downloaded file to keep this behaviour consistent.
154+
tmpPath.chmod(0555);
155+
FileSystemUtils.moveFile(tmpPath, path);
135156
}
136157

137158
private void deletePartialDownload(Path path) {

src/main/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcher.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
3030
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
3131
import com.google.devtools.build.lib.remote.util.DigestUtil;
32+
import com.google.devtools.build.lib.remote.util.TempPathGenerator;
3233
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
3334
import com.google.devtools.build.lib.sandbox.SandboxHelpers;
3435
import com.google.devtools.build.lib.vfs.Path;
@@ -49,8 +50,12 @@ class RemoteActionInputFetcher extends AbstractActionInputPrefetcher {
4950
private final RemoteCache remoteCache;
5051

5152
RemoteActionInputFetcher(
52-
String buildRequestId, String commandId, RemoteCache remoteCache, Path execRoot) {
53-
super(execRoot);
53+
String buildRequestId,
54+
String commandId,
55+
RemoteCache remoteCache,
56+
Path execRoot,
57+
TempPathGenerator tempPathGenerator) {
58+
super(execRoot, tempPathGenerator);
5459
this.buildRequestId = Preconditions.checkNotNull(buildRequestId);
5560
this.commandId = Preconditions.checkNotNull(commandId);
5661
this.remoteCache = Preconditions.checkNotNull(remoteCache);

src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
import com.google.devtools.build.lib.remote.options.RemoteOptions;
7272
import com.google.devtools.build.lib.remote.options.RemoteOutputsMode;
7373
import com.google.devtools.build.lib.remote.util.DigestUtil;
74+
import com.google.devtools.build.lib.remote.util.TempPathGenerator;
7475
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
7576
import com.google.devtools.build.lib.remote.util.Utils;
7677
import com.google.devtools.build.lib.runtime.BlazeModule;
@@ -906,7 +907,8 @@ public void registerActionContexts(
906907
}
907908

908909
@Override
909-
public void executorInit(CommandEnvironment env, BuildRequest request, ExecutorBuilder builder) {
910+
public void executorInit(CommandEnvironment env, BuildRequest request, ExecutorBuilder builder)
911+
throws AbruptExitException {
910912
Preconditions.checkState(actionInputFetcher == null, "actionInputFetcher must be null");
911913
Preconditions.checkNotNull(remoteOptions, "remoteOptions must not be null");
912914

@@ -918,12 +920,27 @@ public void executorInit(CommandEnvironment env, BuildRequest request, ExecutorB
918920
env.getOptions().getOptions(RemoteOptions.class), "RemoteOptions");
919921
RemoteOutputsMode remoteOutputsMode = remoteOptions.remoteOutputsMode;
920922
if (!remoteOutputsMode.downloadAllOutputs() && actionContextProvider.getRemoteCache() != null) {
923+
Path tempDir = env.getActionTempsDirectory().getChild("remote");
924+
try {
925+
if (tempDir.exists()
926+
&& (!tempDir.isDirectory() || !tempDir.getDirectoryEntries().isEmpty())) {
927+
env.getReporter()
928+
.handle(Event.warn("Found incomplete downloads from previous build, deleting..."));
929+
tempDir.deleteTree();
930+
}
931+
} catch (IOException e) {
932+
throw createExitException(
933+
e.getMessage(),
934+
ExitCode.LOCAL_ENVIRONMENTAL_ERROR,
935+
Code.DOWNLOADED_INPUTS_DELETION_FAILURE);
936+
}
921937
actionInputFetcher =
922938
new RemoteActionInputFetcher(
923939
env.getBuildRequestId(),
924940
env.getCommandId().toString(),
925941
actionContextProvider.getRemoteCache(),
926-
env.getExecRoot());
942+
env.getExecRoot(),
943+
new TempPathGenerator(tempDir));
927944
builder.setActionInputPrefetcher(actionInputFetcher);
928945
remoteOutputService.setActionInputFetcher(actionInputFetcher);
929946
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// Copyright 2022 The Bazel Authors. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
package com.google.devtools.build.lib.remote.util;
15+
16+
import com.google.common.annotations.VisibleForTesting;
17+
import com.google.devtools.build.lib.vfs.Path;
18+
import java.util.concurrent.atomic.AtomicInteger;
19+
import javax.annotation.concurrent.ThreadSafe;
20+
21+
/** A generator that generate temporary path under a given directory. */
22+
@ThreadSafe
23+
public class TempPathGenerator {
24+
private final Path tempDir;
25+
private final AtomicInteger index = new AtomicInteger();
26+
27+
public TempPathGenerator(Path tempDir) {
28+
this.tempDir = tempDir;
29+
}
30+
31+
/** Generates a temporary path */
32+
public Path generateTempPath() {
33+
return tempDir.getChild(index.getAndIncrement() + ".tmp");
34+
}
35+
36+
@VisibleForTesting
37+
public Path getTempDir() {
38+
return tempDir;
39+
}
40+
}

src/test/java/com/google/devtools/build/lib/remote/RemoteActionInputFetcherTest.java

Lines changed: 48 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static org.mockito.Mockito.when;
2222

2323
import build.bazel.remote.execution.v2.Digest;
24+
import com.google.common.base.Supplier;
2425
import com.google.common.collect.ImmutableList;
2526
import com.google.common.collect.ImmutableMap;
2627
import com.google.common.collect.Maps;
@@ -43,6 +44,7 @@
4344
import com.google.devtools.build.lib.remote.util.DigestUtil;
4445
import com.google.devtools.build.lib.remote.util.InMemoryCacheClient;
4546
import com.google.devtools.build.lib.remote.util.StaticMetadataProvider;
47+
import com.google.devtools.build.lib.remote.util.TempPathGenerator;
4648
import com.google.devtools.build.lib.vfs.DigestHashFunction;
4749
import com.google.devtools.build.lib.vfs.FileSystem;
4850
import com.google.devtools.build.lib.vfs.FileSystemUtils;
@@ -70,6 +72,7 @@ public class RemoteActionInputFetcherTest {
7072
private static final DigestHashFunction HASH_FUNCTION = DigestHashFunction.SHA256;
7173

7274
private Path execRoot;
75+
private TempPathGenerator tempPathGenerator;
7376
private ArtifactRoot artifactRoot;
7477
private RemoteOptions options;
7578
private DigestUtil digestUtil;
@@ -79,6 +82,9 @@ public void setUp() throws IOException {
7982
FileSystem fs = new InMemoryFileSystem(new JavaClock(), HASH_FUNCTION);
8083
execRoot = fs.getPath("/exec");
8184
execRoot.createDirectoryAndParents();
85+
Path tempDir = fs.getPath("/tmp");
86+
tempDir.createDirectoryAndParents();
87+
tempPathGenerator = new TempPathGenerator(tempDir);
8288
Path dev = fs.getPath("/dev");
8389
dev.createDirectory();
8490
dev.setWritable(false);
@@ -98,7 +104,7 @@ public void testFetching() throws Exception {
98104
MetadataProvider metadataProvider = new StaticMetadataProvider(metadata);
99105
RemoteCache remoteCache = newCache(options, digestUtil, cacheEntries);
100106
RemoteActionInputFetcher actionInputFetcher =
101-
new RemoteActionInputFetcher("none", "none", remoteCache, execRoot);
107+
new RemoteActionInputFetcher("none", "none", remoteCache, execRoot, tempPathGenerator);
102108

103109
// act
104110
wait(actionInputFetcher.prefetchFiles(metadata.keySet(), metadataProvider));
@@ -121,7 +127,7 @@ public void testStagingVirtualActionInput() throws Exception {
121127
MetadataProvider metadataProvider = new StaticMetadataProvider(new HashMap<>());
122128
RemoteCache remoteCache = newCache(options, digestUtil, new HashMap<>());
123129
RemoteActionInputFetcher actionInputFetcher =
124-
new RemoteActionInputFetcher("none", "none", remoteCache, execRoot);
130+
new RemoteActionInputFetcher("none", "none", remoteCache, execRoot, tempPathGenerator);
125131
VirtualActionInput a = ActionsTestUtil.createVirtualActionInput("file1", "hello world");
126132

127133
// act
@@ -141,7 +147,7 @@ public void testStagingEmptyVirtualActionInput() throws Exception {
141147
MetadataProvider metadataProvider = new StaticMetadataProvider(new HashMap<>());
142148
RemoteCache remoteCache = newCache(options, digestUtil, new HashMap<>());
143149
RemoteActionInputFetcher actionInputFetcher =
144-
new RemoteActionInputFetcher("none", "none", remoteCache, execRoot);
150+
new RemoteActionInputFetcher("none", "none", remoteCache, execRoot, tempPathGenerator);
145151

146152
// act
147153
wait(
@@ -164,7 +170,7 @@ public void testFileNotFound() throws Exception {
164170
MetadataProvider metadataProvider = new StaticMetadataProvider(metadata);
165171
RemoteCache remoteCache = newCache(options, digestUtil, new HashMap<>());
166172
RemoteActionInputFetcher actionInputFetcher =
167-
new RemoteActionInputFetcher("none", "none", remoteCache, execRoot);
173+
new RemoteActionInputFetcher("none", "none", remoteCache, execRoot, tempPathGenerator);
168174

169175
// act
170176
assertThrows(
@@ -188,7 +194,7 @@ public void testIgnoreNoneRemoteFiles() throws Exception {
188194
MetadataProvider metadataProvider = new StaticMetadataProvider(ImmutableMap.of(a, f));
189195
RemoteCache remoteCache = newCache(options, digestUtil, new HashMap<>());
190196
RemoteActionInputFetcher actionInputFetcher =
191-
new RemoteActionInputFetcher("none", "none", remoteCache, execRoot);
197+
new RemoteActionInputFetcher("none", "none", remoteCache, execRoot, tempPathGenerator);
192198

193199
// act
194200
wait(actionInputFetcher.prefetchFiles(ImmutableList.of(a), metadataProvider));
@@ -206,7 +212,7 @@ public void testDownloadFile() throws Exception {
206212
Artifact a1 = createRemoteArtifact("file1", "hello world", metadata, cacheEntries);
207213
RemoteCache remoteCache = newCache(options, digestUtil, cacheEntries);
208214
RemoteActionInputFetcher actionInputFetcher =
209-
new RemoteActionInputFetcher("none", "none", remoteCache, execRoot);
215+
new RemoteActionInputFetcher("none", "none", remoteCache, execRoot, tempPathGenerator);
210216

211217
// act
212218
actionInputFetcher.downloadFile(a1.getPath(), metadata.get(a1));
@@ -227,23 +233,15 @@ public void testDownloadFile_onInterrupt_deletePartialDownloadedFile() throws Ex
227233
Map<Digest, ByteString> cacheEntries = new HashMap<>();
228234
Artifact a1 = createRemoteArtifact("file1", "hello world", metadata, cacheEntries);
229235
RemoteCache remoteCache = mock(RemoteCache.class);
230-
when(remoteCache.downloadFile(any(), any(), any()))
231-
.thenAnswer(
232-
invocation -> {
233-
Path path = invocation.getArgument(1);
234-
Digest digest = invocation.getArgument(2);
235-
ByteString content = cacheEntries.get(digest);
236-
if (content == null) {
237-
return Futures.immediateFailedFuture(new IOException("Not found"));
238-
}
239-
content.writeTo(path.getOutputStream());
240-
241-
startSemaphore.release();
242-
return SettableFuture
243-
.create(); // A future that never complete so we can interrupt later
244-
});
236+
mockDownload(
237+
remoteCache,
238+
cacheEntries,
239+
() -> {
240+
startSemaphore.release();
241+
return SettableFuture.create(); // A future that never complete so we can interrupt later
242+
});
245243
RemoteActionInputFetcher actionInputFetcher =
246-
new RemoteActionInputFetcher("none", "none", remoteCache, execRoot);
244+
new RemoteActionInputFetcher("none", "none", remoteCache, execRoot, tempPathGenerator);
247245

248246
AtomicBoolean interrupted = new AtomicBoolean(false);
249247
Thread t =
@@ -265,6 +263,7 @@ public void testDownloadFile_onInterrupt_deletePartialDownloadedFile() throws Ex
265263

266264
assertThat(interrupted.get()).isTrue();
267265
assertThat(a1.getPath().exists()).isFalse();
266+
assertThat(tempPathGenerator.getTempDir().getDirectoryEntries()).isEmpty();
268267
}
269268

270269
@Test
@@ -279,9 +278,9 @@ public void testPrefetchFiles_multipleThreads_downloadIsNotCancelledByOtherThrea
279278
MetadataProvider metadataProvider = new StaticMetadataProvider(metadata);
280279
SettableFuture<Void> download = SettableFuture.create();
281280
RemoteCache remoteCache = mock(RemoteCache.class);
282-
when(remoteCache.downloadFile(any(), any(), any())).thenAnswer(invocation -> download);
281+
mockDownload(remoteCache, cacheEntries, () -> download);
283282
RemoteActionInputFetcher actionInputFetcher =
284-
new RemoteActionInputFetcher("none", "none", remoteCache, execRoot);
283+
new RemoteActionInputFetcher("none", "none", remoteCache, execRoot, tempPathGenerator);
285284
Thread cancelledThread =
286285
new Thread(
287286
() -> {
@@ -326,6 +325,8 @@ public void testPrefetchFiles_multipleThreads_downloadIsNotCancelledByOtherThrea
326325

327326
// assert
328327
assertThat(successful.get()).isTrue();
328+
assertThat(FileSystemUtils.readContent(artifact.getPath(), StandardCharsets.UTF_8))
329+
.isEqualTo("hello world");
329330
}
330331

331332
@Test
@@ -340,9 +341,9 @@ public void testPrefetchFiles_multipleThreads_downloadIsCancelled() throws Excep
340341

341342
SettableFuture<Void> download = SettableFuture.create();
342343
RemoteCache remoteCache = mock(RemoteCache.class);
343-
when(remoteCache.downloadFile(any(), any(), any())).thenAnswer(invocation -> download);
344+
mockDownload(remoteCache, cacheEntries, () -> download);
344345
RemoteActionInputFetcher actionInputFetcher =
345-
new RemoteActionInputFetcher("none", "none", remoteCache, execRoot);
346+
new RemoteActionInputFetcher("none", "none", remoteCache, execRoot, tempPathGenerator);
346347

347348
Thread cancelledThread1 =
348349
new Thread(
@@ -376,6 +377,8 @@ public void testPrefetchFiles_multipleThreads_downloadIsCancelled() throws Excep
376377

377378
// assert
378379
assertThat(download.isCancelled()).isTrue();
380+
assertThat(artifact.getPath().exists()).isFalse();
381+
assertThat(tempPathGenerator.getTempDir().getDirectoryEntries()).isEmpty();
379382
}
380383

381384
private Artifact createRemoteArtifact(
@@ -420,4 +423,23 @@ private static void wait(ListenableFuture<Void> future) throws IOException, Inte
420423
throw e;
421424
}
422425
}
426+
427+
private static void mockDownload(
428+
RemoteCache remoteCache,
429+
Map<Digest, ByteString> cacheEntries,
430+
Supplier<ListenableFuture<Void>> resultSupplier)
431+
throws IOException {
432+
when(remoteCache.downloadFile(any(), any(), any()))
433+
.thenAnswer(
434+
invocation -> {
435+
Path path = invocation.getArgument(1);
436+
Digest digest = invocation.getArgument(2);
437+
ByteString content = cacheEntries.get(digest);
438+
if (content == null) {
439+
return Futures.immediateFailedFuture(new IOException("Not found"));
440+
}
441+
FileSystemUtils.writeContent(path, content.toByteArray());
442+
return resultSupplier.get();
443+
});
444+
}
423445
}

0 commit comments

Comments
 (0)