55 */
66package org .elasticsearch .index .store .cache ;
77
8- import org .apache .lucene .mockfile .FilterFileChannel ;
9- import org .apache .lucene .mockfile .FilterFileSystemProvider ;
10- import org .apache .lucene .mockfile .FilterPath ;
118import org .apache .lucene .util .SetOnce ;
129import org .elasticsearch .cluster .coordination .DeterministicTaskQueue ;
1310import org .elasticsearch .common .collect .Tuple ;
1411import org .elasticsearch .common .io .PathUtils ;
1512import org .elasticsearch .common .io .PathUtilsForTesting ;
1613import org .elasticsearch .index .store .cache .CacheFile .EvictionListener ;
14+ import org .elasticsearch .index .store .cache .TestUtils .FSyncTrackingFileSystemProvider ;
1715import org .elasticsearch .test .ESTestCase ;
1816import org .elasticsearch .threadpool .ThreadPool ;
1917import org .hamcrest .Matcher ;
2220import java .nio .channels .FileChannel ;
2321import java .nio .file .FileSystem ;
2422import java .nio .file .Files ;
25- import java .nio .file .OpenOption ;
2623import java .nio .file .Path ;
27- import java .nio .file .attribute .FileAttribute ;
28- import java .nio .file .spi .FileSystemProvider ;
2924import java .util .ArrayList ;
3025import java .util .Comparator ;
3126import java .util .Iterator ;
3227import java .util .List ;
33- import java .util .Map ;
3428import java .util .Objects ;
35- import java .util .Set ;
3629import java .util .SortedSet ;
3730import java .util .TreeSet ;
38- import java .util .concurrent .ConcurrentHashMap ;
3931import java .util .concurrent .Future ;
4032import java .util .concurrent .atomic .AtomicBoolean ;
41- import java .util .concurrent .atomic .AtomicLong ;
4233
4334import static java .util .Collections .synchronizedNavigableSet ;
4435import static org .elasticsearch .common .settings .Settings .builder ;
4536import static org .elasticsearch .index .store .cache .TestUtils .mergeContiguousRanges ;
4637import static org .elasticsearch .node .Node .NODE_NAME_SETTING ;
38+ import static org .hamcrest .Matchers .containsString ;
4739import static org .hamcrest .Matchers .equalTo ;
4840import static org .hamcrest .Matchers .hasSize ;
4941import static org .hamcrest .Matchers .is ;
@@ -196,7 +188,8 @@ public void testConcurrentAccess() throws Exception {
196188 }
197189
198190 public void testFSync () throws Exception {
199- try (FSyncTrackingFileSystemProvider fileSystem = setupFSyncCountingFileSystem ()) {
191+ final FSyncTrackingFileSystemProvider fileSystem = setupFSyncCountingFileSystem ();
192+ try {
200193 final AtomicBoolean needsFSyncCalled = new AtomicBoolean ();
201194 final CacheFile cacheFile = new CacheFile (
202195 "test" ,
@@ -213,7 +206,7 @@ public void testFSync() throws Exception {
213206 try {
214207 if (randomBoolean ()) {
215208 final SortedSet <Tuple <Long , Long >> completedRanges = cacheFile .fsync ();
216- assertNumberOfFSyncs (cacheFile .getFile (), equalTo (0L ));
209+ assertNumberOfFSyncs (cacheFile .getFile (), equalTo (0 ));
217210 assertThat (completedRanges , hasSize (0 ));
218211 assertFalse (cacheFile .needsFsync ());
219212 assertFalse (needsFSyncCalled .get ());
@@ -229,18 +222,21 @@ public void testFSync() throws Exception {
229222 }
230223
231224 final SortedSet <Tuple <Long , Long >> completedRanges = cacheFile .fsync ();
232- assertNumberOfFSyncs (cacheFile .getFile (), equalTo (expectedCompletedRanges .isEmpty () ? 0L : 1L ));
225+ assertNumberOfFSyncs (cacheFile .getFile (), equalTo (expectedCompletedRanges .isEmpty () ? 0 : 1 ));
233226 assertArrayEquals (completedRanges .toArray (Tuple []::new ), expectedCompletedRanges .toArray (Tuple []::new ));
234227 assertFalse (cacheFile .needsFsync ());
235228 assertFalse (needsFSyncCalled .get ());
236229 } finally {
237230 cacheFile .release (listener );
238231 }
232+ } finally {
233+ PathUtilsForTesting .installMock (fileSystem .getDelegateInstance ());
239234 }
240235 }
241236
242237 public void testFSyncOnEvictedFile () throws Exception {
243- try (FSyncTrackingFileSystemProvider fileSystem = setupFSyncCountingFileSystem ()) {
238+ final FSyncTrackingFileSystemProvider fileSystem = setupFSyncCountingFileSystem ();
239+ try {
244240 final AtomicBoolean needsFSyncCalled = new AtomicBoolean ();
245241 final CacheFile cacheFile = new CacheFile (
246242 "test" ,
@@ -262,27 +258,30 @@ public void testFSyncOnEvictedFile() throws Exception {
262258
263259 final SortedSet <Tuple <Long , Long >> completedRanges = cacheFile .fsync ();
264260 assertArrayEquals (completedRanges .toArray (Tuple []::new ), expectedCompletedRanges .toArray (Tuple []::new ));
265- assertNumberOfFSyncs (cacheFile .getFile (), equalTo (1L ));
261+ assertNumberOfFSyncs (cacheFile .getFile (), equalTo (1 ));
266262 }
267263 assertFalse (cacheFile .needsFsync ());
268264 assertFalse (needsFSyncCalled .get ());
269265
270266 cacheFile .startEviction ();
271267
272268 final SortedSet <Tuple <Long , Long >> completedRangesAfterEviction = cacheFile .fsync ();
273- assertNumberOfFSyncs (cacheFile .getFile (), equalTo (expectedCompletedRanges .isEmpty () ? 0L : 1L ));
269+ assertNumberOfFSyncs (cacheFile .getFile (), equalTo (expectedCompletedRanges .isEmpty () ? 0 : 1 ));
274270 assertThat (completedRangesAfterEviction , hasSize (0 ));
275271 assertFalse (cacheFile .needsFsync ());
276272 assertFalse (needsFSyncCalled .get ());
277273 } finally {
278274 cacheFile .release (listener );
279275 }
276+ } finally {
277+ PathUtilsForTesting .installMock (fileSystem .getDelegateInstance ());
280278 }
281279 }
282280
283281 public void testFSyncFailure () throws Exception {
284- try (FSyncTrackingFileSystemProvider fileSystem = setupFSyncCountingFileSystem ()) {
285- fileSystem .failFSyncs .set (true );
282+ final FSyncTrackingFileSystemProvider fileSystem = setupFSyncCountingFileSystem ();
283+ try {
284+ fileSystem .failFSyncs (true );
286285
287286 final AtomicBoolean needsFSyncCalled = new AtomicBoolean ();
288287 final CacheFile cacheFile = new CacheFile (
@@ -302,26 +301,29 @@ public void testFSyncFailure() throws Exception {
302301 if (expectedCompletedRanges .isEmpty () == false ) {
303302 assertTrue (cacheFile .needsFsync ());
304303 assertTrue (needsFSyncCalled .getAndSet (false ));
305- expectThrows (IOException .class , cacheFile ::fsync );
304+ IOException exception = expectThrows (IOException .class , cacheFile ::fsync );
305+ assertThat (exception .getMessage (), containsString ("simulated" ));
306306 assertTrue (cacheFile .needsFsync ());
307307 assertTrue (needsFSyncCalled .getAndSet (false ));
308308 } else {
309309 assertFalse (cacheFile .needsFsync ());
310310 final SortedSet <Tuple <Long , Long >> completedRanges = cacheFile .fsync ();
311311 assertTrue (completedRanges .isEmpty ());
312312 }
313- assertNumberOfFSyncs (cacheFile .getFile (), equalTo (0L ));
313+ assertNumberOfFSyncs (cacheFile .getFile (), equalTo (0 ));
314314
315- fileSystem .failFSyncs . set (false );
315+ fileSystem .failFSyncs (false );
316316
317317 final SortedSet <Tuple <Long , Long >> completedRanges = cacheFile .fsync ();
318318 assertArrayEquals (completedRanges .toArray (Tuple []::new ), expectedCompletedRanges .toArray (Tuple []::new ));
319- assertNumberOfFSyncs (cacheFile .getFile (), equalTo (expectedCompletedRanges .isEmpty () ? 0L : 1L ));
319+ assertNumberOfFSyncs (cacheFile .getFile (), equalTo (expectedCompletedRanges .isEmpty () ? 0 : 1 ));
320320 assertFalse (cacheFile .needsFsync ());
321321 assertFalse (needsFSyncCalled .get ());
322322 } finally {
323323 cacheFile .release (listener );
324324 }
325+ } finally {
326+ PathUtilsForTesting .installMock (fileSystem .getDelegateInstance ());
325327 }
326328 }
327329
@@ -366,11 +368,11 @@ private SortedSet<Tuple<Long, Long>> randomPopulateAndReads(final CacheFile cach
366368 return mergeContiguousRanges (ranges );
367369 }
368370
369- public static void assertNumberOfFSyncs (final Path path , final Matcher <Long > matcher ) {
371+ public static void assertNumberOfFSyncs (final Path path , final Matcher <Integer > matcher ) {
370372 final FSyncTrackingFileSystemProvider provider = (FSyncTrackingFileSystemProvider ) path .getFileSystem ().provider ();
371- final AtomicLong fsyncCounter = provider .files . get (path );
372- assertThat ("File [" + path + "] was never fsynced" , fsyncCounter , notNullValue ());
373- assertThat ("Mismatching number of fsync for [" + path + "]" , fsyncCounter . get () , matcher );
373+ final Integer fsyncCount = provider .getNumberOfFSyncs (path );
374+ assertThat ("File [" + path + "] was never fsynced" , fsyncCount , notNullValue ());
375+ assertThat ("Mismatching number of fsync for [" + path + "]" , fsyncCount , matcher );
374376 }
375377
376378 private static FSyncTrackingFileSystemProvider setupFSyncCountingFileSystem () {
@@ -379,48 +381,4 @@ private static FSyncTrackingFileSystemProvider setupFSyncCountingFileSystem() {
379381 PathUtilsForTesting .installMock (provider .getFileSystem (null ));
380382 return provider ;
381383 }
382-
383- /**
384- * A {@link FileSystemProvider} that counts the number of times the method {@link FileChannel#force(boolean)} is executed on every
385- * files. It reinstates the default file system when this file system provider is closed.
386- */
387- private static class FSyncTrackingFileSystemProvider extends FilterFileSystemProvider implements AutoCloseable {
388-
389- private final Map <Path , AtomicLong > files = new ConcurrentHashMap <>();
390- private final AtomicBoolean failFSyncs = new AtomicBoolean ();
391- private final FileSystem delegateInstance ;
392- private final Path rootDir ;
393-
394- FSyncTrackingFileSystemProvider (FileSystem delegate , Path rootDir ) {
395- super ("fsynccounting://" , delegate );
396- this .rootDir = new FilterPath (rootDir , this .fileSystem );
397- this .delegateInstance = delegate ;
398- }
399-
400- public Path resolve (String other ) {
401- return rootDir .resolve (other );
402- }
403-
404- @ Override
405- public FileChannel newFileChannel (Path path , Set <? extends OpenOption > options , FileAttribute <?>... attrs ) throws IOException {
406- return new FilterFileChannel (delegate .newFileChannel (toDelegate (path ), options , attrs )) {
407-
408- final AtomicLong counter = files .computeIfAbsent (path , p -> new AtomicLong (0L ));
409-
410- @ Override
411- public void force (boolean metaData ) throws IOException {
412- if (failFSyncs .get ()) {
413- throw new IOException ("simulated" );
414- }
415- super .force (metaData );
416- counter .incrementAndGet ();
417- }
418- };
419- }
420-
421- @ Override
422- public void close () {
423- PathUtilsForTesting .installMock (delegateInstance );
424- }
425- }
426384}
0 commit comments