Skip to content

Commit 5091a5d

Browse files
committed
mutualize FSyncTrackingFileSystemProvider
1 parent 74f728f commit 5091a5d

2 files changed

Lines changed: 45 additions & 70 deletions

File tree

x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CacheFileTests.java

Lines changed: 28 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,13 @@
55
*/
66
package org.elasticsearch.index.store.cache;
77

8-
import org.apache.lucene.mockfile.FilterFileChannel;
9-
import org.apache.lucene.mockfile.FilterFileSystemProvider;
10-
import org.apache.lucene.mockfile.FilterPath;
118
import org.apache.lucene.util.SetOnce;
129
import org.elasticsearch.cluster.coordination.DeterministicTaskQueue;
1310
import org.elasticsearch.common.collect.Tuple;
1411
import org.elasticsearch.common.io.PathUtils;
1512
import org.elasticsearch.common.io.PathUtilsForTesting;
1613
import org.elasticsearch.index.store.cache.CacheFile.EvictionListener;
14+
import org.elasticsearch.index.store.cache.TestUtils.FSyncTrackingFileSystemProvider;
1715
import org.elasticsearch.test.ESTestCase;
1816
import org.elasticsearch.threadpool.ThreadPool;
1917
import org.hamcrest.Matcher;
@@ -22,28 +20,22 @@
2220
import java.nio.channels.FileChannel;
2321
import java.nio.file.FileSystem;
2422
import java.nio.file.Files;
25-
import java.nio.file.OpenOption;
2623
import java.nio.file.Path;
27-
import java.nio.file.attribute.FileAttribute;
28-
import java.nio.file.spi.FileSystemProvider;
2924
import java.util.ArrayList;
3025
import java.util.Comparator;
3126
import java.util.Iterator;
3227
import java.util.List;
33-
import java.util.Map;
3428
import java.util.Objects;
35-
import java.util.Set;
3629
import java.util.SortedSet;
3730
import java.util.TreeSet;
38-
import java.util.concurrent.ConcurrentHashMap;
3931
import java.util.concurrent.Future;
4032
import java.util.concurrent.atomic.AtomicBoolean;
41-
import java.util.concurrent.atomic.AtomicLong;
4233

4334
import static java.util.Collections.synchronizedNavigableSet;
4435
import static org.elasticsearch.common.settings.Settings.builder;
4536
import static org.elasticsearch.index.store.cache.TestUtils.mergeContiguousRanges;
4637
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
38+
import static org.hamcrest.Matchers.containsString;
4739
import static org.hamcrest.Matchers.equalTo;
4840
import static org.hamcrest.Matchers.hasSize;
4941
import static org.hamcrest.Matchers.is;
@@ -196,7 +188,8 @@ public void testConcurrentAccess() throws Exception {
196188
}
197189

198190
public void testFSync() throws Exception {
199-
try (FSyncTrackingFileSystemProvider fileSystem = setupFSyncCountingFileSystem()) {
191+
final FSyncTrackingFileSystemProvider fileSystem = setupFSyncCountingFileSystem();
192+
try {
200193
final AtomicBoolean needsFSyncCalled = new AtomicBoolean();
201194
final CacheFile cacheFile = new CacheFile(
202195
"test",
@@ -213,7 +206,7 @@ public void testFSync() throws Exception {
213206
try {
214207
if (randomBoolean()) {
215208
final SortedSet<Tuple<Long, Long>> completedRanges = cacheFile.fsync();
216-
assertNumberOfFSyncs(cacheFile.getFile(), equalTo(0L));
209+
assertNumberOfFSyncs(cacheFile.getFile(), equalTo(0));
217210
assertThat(completedRanges, hasSize(0));
218211
assertFalse(cacheFile.needsFsync());
219212
assertFalse(needsFSyncCalled.get());
@@ -229,18 +222,21 @@ public void testFSync() throws Exception {
229222
}
230223

231224
final SortedSet<Tuple<Long, Long>> completedRanges = cacheFile.fsync();
232-
assertNumberOfFSyncs(cacheFile.getFile(), equalTo(expectedCompletedRanges.isEmpty() ? 0L : 1L));
225+
assertNumberOfFSyncs(cacheFile.getFile(), equalTo(expectedCompletedRanges.isEmpty() ? 0 : 1));
233226
assertArrayEquals(completedRanges.toArray(Tuple[]::new), expectedCompletedRanges.toArray(Tuple[]::new));
234227
assertFalse(cacheFile.needsFsync());
235228
assertFalse(needsFSyncCalled.get());
236229
} finally {
237230
cacheFile.release(listener);
238231
}
232+
} finally {
233+
PathUtilsForTesting.installMock(fileSystem.getDelegateInstance());
239234
}
240235
}
241236

242237
public void testFSyncOnEvictedFile() throws Exception {
243-
try (FSyncTrackingFileSystemProvider fileSystem = setupFSyncCountingFileSystem()) {
238+
final FSyncTrackingFileSystemProvider fileSystem = setupFSyncCountingFileSystem();
239+
try {
244240
final AtomicBoolean needsFSyncCalled = new AtomicBoolean();
245241
final CacheFile cacheFile = new CacheFile(
246242
"test",
@@ -262,27 +258,30 @@ public void testFSyncOnEvictedFile() throws Exception {
262258

263259
final SortedSet<Tuple<Long, Long>> completedRanges = cacheFile.fsync();
264260
assertArrayEquals(completedRanges.toArray(Tuple[]::new), expectedCompletedRanges.toArray(Tuple[]::new));
265-
assertNumberOfFSyncs(cacheFile.getFile(), equalTo(1L));
261+
assertNumberOfFSyncs(cacheFile.getFile(), equalTo(1));
266262
}
267263
assertFalse(cacheFile.needsFsync());
268264
assertFalse(needsFSyncCalled.get());
269265

270266
cacheFile.startEviction();
271267

272268
final SortedSet<Tuple<Long, Long>> completedRangesAfterEviction = cacheFile.fsync();
273-
assertNumberOfFSyncs(cacheFile.getFile(), equalTo(expectedCompletedRanges.isEmpty() ? 0L : 1L));
269+
assertNumberOfFSyncs(cacheFile.getFile(), equalTo(expectedCompletedRanges.isEmpty() ? 0 : 1));
274270
assertThat(completedRangesAfterEviction, hasSize(0));
275271
assertFalse(cacheFile.needsFsync());
276272
assertFalse(needsFSyncCalled.get());
277273
} finally {
278274
cacheFile.release(listener);
279275
}
276+
} finally {
277+
PathUtilsForTesting.installMock(fileSystem.getDelegateInstance());
280278
}
281279
}
282280

283281
public void testFSyncFailure() throws Exception {
284-
try (FSyncTrackingFileSystemProvider fileSystem = setupFSyncCountingFileSystem()) {
285-
fileSystem.failFSyncs.set(true);
282+
final FSyncTrackingFileSystemProvider fileSystem = setupFSyncCountingFileSystem();
283+
try {
284+
fileSystem.failFSyncs(true);
286285

287286
final AtomicBoolean needsFSyncCalled = new AtomicBoolean();
288287
final CacheFile cacheFile = new CacheFile(
@@ -302,26 +301,29 @@ public void testFSyncFailure() throws Exception {
302301
if (expectedCompletedRanges.isEmpty() == false) {
303302
assertTrue(cacheFile.needsFsync());
304303
assertTrue(needsFSyncCalled.getAndSet(false));
305-
expectThrows(IOException.class, cacheFile::fsync);
304+
IOException exception = expectThrows(IOException.class, cacheFile::fsync);
305+
assertThat(exception.getMessage(), containsString("simulated"));
306306
assertTrue(cacheFile.needsFsync());
307307
assertTrue(needsFSyncCalled.getAndSet(false));
308308
} else {
309309
assertFalse(cacheFile.needsFsync());
310310
final SortedSet<Tuple<Long, Long>> completedRanges = cacheFile.fsync();
311311
assertTrue(completedRanges.isEmpty());
312312
}
313-
assertNumberOfFSyncs(cacheFile.getFile(), equalTo(0L));
313+
assertNumberOfFSyncs(cacheFile.getFile(), equalTo(0));
314314

315-
fileSystem.failFSyncs.set(false);
315+
fileSystem.failFSyncs(false);
316316

317317
final SortedSet<Tuple<Long, Long>> completedRanges = cacheFile.fsync();
318318
assertArrayEquals(completedRanges.toArray(Tuple[]::new), expectedCompletedRanges.toArray(Tuple[]::new));
319-
assertNumberOfFSyncs(cacheFile.getFile(), equalTo(expectedCompletedRanges.isEmpty() ? 0L : 1L));
319+
assertNumberOfFSyncs(cacheFile.getFile(), equalTo(expectedCompletedRanges.isEmpty() ? 0 : 1));
320320
assertFalse(cacheFile.needsFsync());
321321
assertFalse(needsFSyncCalled.get());
322322
} finally {
323323
cacheFile.release(listener);
324324
}
325+
} finally {
326+
PathUtilsForTesting.installMock(fileSystem.getDelegateInstance());
325327
}
326328
}
327329

@@ -366,11 +368,11 @@ private SortedSet<Tuple<Long, Long>> randomPopulateAndReads(final CacheFile cach
366368
return mergeContiguousRanges(ranges);
367369
}
368370

369-
public static void assertNumberOfFSyncs(final Path path, final Matcher<Long> matcher) {
371+
public static void assertNumberOfFSyncs(final Path path, final Matcher<Integer> matcher) {
370372
final FSyncTrackingFileSystemProvider provider = (FSyncTrackingFileSystemProvider) path.getFileSystem().provider();
371-
final AtomicLong fsyncCounter = provider.files.get(path);
372-
assertThat("File [" + path + "] was never fsynced", fsyncCounter, notNullValue());
373-
assertThat("Mismatching number of fsync for [" + path + "]", fsyncCounter.get(), matcher);
373+
final Integer fsyncCount = provider.getNumberOfFSyncs(path);
374+
assertThat("File [" + path + "] was never fsynced", fsyncCount, notNullValue());
375+
assertThat("Mismatching number of fsync for [" + path + "]", fsyncCount, matcher);
374376
}
375377

376378
private static FSyncTrackingFileSystemProvider setupFSyncCountingFileSystem() {
@@ -379,48 +381,4 @@ private static FSyncTrackingFileSystemProvider setupFSyncCountingFileSystem() {
379381
PathUtilsForTesting.installMock(provider.getFileSystem(null));
380382
return provider;
381383
}
382-
383-
/**
384-
* A {@link FileSystemProvider} that counts the number of times the method {@link FileChannel#force(boolean)} is executed on every
385-
* files. It reinstates the default file system when this file system provider is closed.
386-
*/
387-
private static class FSyncTrackingFileSystemProvider extends FilterFileSystemProvider implements AutoCloseable {
388-
389-
private final Map<Path, AtomicLong> files = new ConcurrentHashMap<>();
390-
private final AtomicBoolean failFSyncs = new AtomicBoolean();
391-
private final FileSystem delegateInstance;
392-
private final Path rootDir;
393-
394-
FSyncTrackingFileSystemProvider(FileSystem delegate, Path rootDir) {
395-
super("fsynccounting://", delegate);
396-
this.rootDir = new FilterPath(rootDir, this.fileSystem);
397-
this.delegateInstance = delegate;
398-
}
399-
400-
public Path resolve(String other) {
401-
return rootDir.resolve(other);
402-
}
403-
404-
@Override
405-
public FileChannel newFileChannel(Path path, Set<? extends OpenOption> options, FileAttribute<?>... attrs) throws IOException {
406-
return new FilterFileChannel(delegate.newFileChannel(toDelegate(path), options, attrs)) {
407-
408-
final AtomicLong counter = files.computeIfAbsent(path, p -> new AtomicLong(0L));
409-
410-
@Override
411-
public void force(boolean metaData) throws IOException {
412-
if (failFSyncs.get()) {
413-
throw new IOException("simulated");
414-
}
415-
super.force(metaData);
416-
counter.incrementAndGet();
417-
}
418-
};
419-
}
420-
421-
@Override
422-
public void close() {
423-
PathUtilsForTesting.installMock(delegateInstance);
424-
}
425-
}
426384
}

x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/TestUtils.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import java.util.TreeSet;
4343
import java.util.concurrent.ConcurrentHashMap;
4444
import java.util.concurrent.Future;
45+
import java.util.concurrent.atomic.AtomicBoolean;
4546
import java.util.concurrent.atomic.AtomicInteger;
4647

4748
import static java.util.Collections.synchronizedNavigableSet;
@@ -299,6 +300,7 @@ public void putAsync(
299300
public static class FSyncTrackingFileSystemProvider extends FilterFileSystemProvider {
300301

301302
private final Map<Path, AtomicInteger> files = new ConcurrentHashMap<>();
303+
private final AtomicBoolean failFSyncs = new AtomicBoolean();
302304
private final FileSystem delegateInstance;
303305
private final Path rootDir;
304306

@@ -308,6 +310,18 @@ public FSyncTrackingFileSystemProvider(FileSystem delegate, Path rootDir) {
308310
this.delegateInstance = delegate;
309311
}
310312

313+
public FileSystem getDelegateInstance() {
314+
return delegateInstance;
315+
}
316+
317+
public void failFSyncs(boolean shouldFail) {
318+
failFSyncs.set(shouldFail);
319+
}
320+
321+
public Path resolve(String other) {
322+
return rootDir.resolve(other);
323+
}
324+
311325
@Nullable
312326
public Integer getNumberOfFSyncs(Path path) {
313327
final AtomicInteger counter = files.get(path);
@@ -321,6 +335,9 @@ public FileChannel newFileChannel(Path path, Set<? extends OpenOption> options,
321335

322336
@Override
323337
public void force(boolean metaData) throws IOException {
338+
if (failFSyncs.get()) {
339+
throw new IOException("simulated");
340+
}
324341
super.force(metaData);
325342
counter.incrementAndGet();
326343
}

0 commit comments

Comments
 (0)