Skip to content

Commit a3f73b7

Browse files
Meh ThreadPoolMergeSchedulerStressTestIT
1 parent 4d0cab2 commit a3f73b7

1 file changed

Lines changed: 10 additions & 6 deletions

File tree

server/src/internalClusterTest/java/org/elasticsearch/index/engine/ThreadPoolMergeSchedulerStressTestIT.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import java.util.concurrent.Semaphore;
4242
import java.util.concurrent.TimeUnit;
4343
import java.util.concurrent.atomic.AtomicBoolean;
44+
import java.util.concurrent.atomic.AtomicInteger;
4445
import java.util.concurrent.atomic.AtomicReference;
4546

4647
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAllSuccessful;
@@ -257,12 +258,15 @@ public void testMergingFallsBehindAndThenCatchesUp() throws Exception {
257258
indexingThreads[i].start();
258259
}
259260
TestEnginePlugin testEnginePlugin = getTestEnginePlugin();
261+
// get the segments count before unblocking the merge threads
262+
AtomicInteger segmentsCountBefore = new AtomicInteger();
260263
assertBusy(() -> {
261264
// wait for merges to enqueue or backlog
262265
assertThat(testEnginePlugin.enqueuedMergesSet.size(), greaterThanOrEqualTo(testEnginePlugin.waitMergesEnqueuedCount));
266+
segmentsCountBefore.set(getSegmentsCountForAllShards("index"));
267+
// there are at least 2 segments for each outstanding merge
268+
assertThat(segmentsCountBefore.get(), greaterThan(2 * testEnginePlugin.waitMergesEnqueuedCount));
263269
}, 1, TimeUnit.MINUTES);
264-
// get the segments count before unblocking the merge threads
265-
var segmentsBefore = getSegmentsCountForAllShards("index");
266270
// finish up indexing
267271
indexingDone.set(true);
268272
for (Thread indexingThread : indexingThreads) {
@@ -276,11 +280,9 @@ public void testMergingFallsBehindAndThenCatchesUp() throws Exception {
276280
assertThat(testEnginePlugin.enqueuedMergesSet.size(), is(0));
277281
testEnginePlugin.mergeExecutorServiceReference.get().allDone();
278282
}, 1, TimeUnit.MINUTES);
279-
// refresh, otherwise we'd be still seeing the old merged-away segments
280-
assertAllSuccessful(indicesAdmin().prepareRefresh("index").get());
281-
var segmentsAfter = getSegmentsCountForAllShards("index");
283+
var segmentsCountAfter = getSegmentsCountForAllShards("index");
282284
// there should be way fewer segments after merging completed
283-
assertThat(segmentsBefore, greaterThan(segmentsAfter));
285+
assertThat(segmentsCountBefore.get(), greaterThan(segmentsCountAfter));
284286
// let's also run a force-merge
285287
assertAllSuccessful(indicesAdmin().prepareForceMerge("index").setMaxNumSegments(1).get());
286288
assertAllSuccessful(indicesAdmin().prepareRefresh("index").get());
@@ -297,6 +299,8 @@ public void testMergingFallsBehindAndThenCatchesUp() throws Exception {
297299
}
298300

299301
private int getSegmentsCountForAllShards(String indexName) {
302+
// refresh, otherwise we'd be still seeing the old merged-away segments
303+
assertAllSuccessful(indicesAdmin().prepareRefresh(indexName).get());
300304
int count = 0;
301305
IndicesSegmentResponse indicesSegmentResponse = indicesAdmin().prepareSegments(indexName).get();
302306
Iterator<IndexShardSegments> indexShardSegmentsIterator = indicesSegmentResponse.getIndices().get(indexName).iterator();

0 commit comments

Comments
 (0)