Skip to content

Commit b8c1a4e

Browse files
committed
Add following recovery test
1 parent 4d8aa66 commit b8c1a4e

1 file changed

Lines changed: 32 additions & 52 deletions

File tree

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java

Lines changed: 32 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
1515
import org.elasticsearch.action.bulk.BulkItemResponse;
1616
import org.elasticsearch.action.delete.DeleteRequest;
17-
import org.elasticsearch.action.index.IndexRequest;
1817
import org.elasticsearch.action.support.PlainActionFuture;
1918
import org.elasticsearch.action.support.replication.TransportWriteAction;
2019
import org.elasticsearch.cluster.metadata.IndexMetaData;
@@ -25,16 +24,15 @@
2524
import org.elasticsearch.common.unit.ByteSizeUnit;
2625
import org.elasticsearch.common.unit.ByteSizeValue;
2726
import org.elasticsearch.common.unit.TimeValue;
28-
import org.elasticsearch.common.xcontent.XContentType;
2927
import org.elasticsearch.index.IndexSettings;
3028
import org.elasticsearch.index.engine.EngineFactory;
31-
import org.elasticsearch.index.engine.InternalEngineFactory;
3229
import org.elasticsearch.index.replication.ESIndexLevelReplicationTestCase;
3330
import org.elasticsearch.index.seqno.SeqNoStats;
3431
import org.elasticsearch.index.shard.IndexShard;
3532
import org.elasticsearch.index.shard.IndexShardTestCase;
3633
import org.elasticsearch.index.shard.ShardId;
3734
import org.elasticsearch.index.translog.Translog;
35+
import org.elasticsearch.indices.recovery.RecoveryTarget;
3836
import org.elasticsearch.threadpool.ThreadPool;
3937
import org.elasticsearch.xpack.ccr.CcrSettings;
4038
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest;
@@ -44,21 +42,21 @@
4442
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory;
4543

4644
import java.io.IOException;
45+
import java.nio.charset.StandardCharsets;
4746
import java.util.ArrayList;
4847
import java.util.Arrays;
4948
import java.util.Collections;
5049
import java.util.HashSet;
5150
import java.util.List;
5251
import java.util.Set;
52+
import java.util.concurrent.Future;
5353
import java.util.concurrent.atomic.AtomicBoolean;
54-
import java.util.concurrent.atomic.AtomicInteger;
5554
import java.util.function.BiConsumer;
5655
import java.util.function.Consumer;
5756
import java.util.function.LongConsumer;
5857
import java.util.stream.Collectors;
5958

6059
import static org.hamcrest.Matchers.equalTo;
61-
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
6260
import static org.hamcrest.Matchers.is;
6361
import static org.hamcrest.Matchers.nullValue;
6462

@@ -287,56 +285,38 @@ public void testRetryBulkShardOperations() throws Exception {
287285
}
288286
}
289287

290-
/**
291-
* This is not a following test; it's mostly copied from RecoveryDuringReplicationTests#testAddNewReplicas.
292-
* This test verifies the MSU optimization which is currently only enabled on {@link FollowingEngine}.
293-
*/
294-
public void testAddNewReplicasWithFollowingEngine() throws Exception {
295-
Settings settings = Settings.builder()
296-
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
297-
.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true)
298-
.build();
299-
IndexMetaData indexMetaData = buildIndexMetaData(between(0, 1), settings, indexMapping);
300-
try (ReplicationGroup shards = new ReplicationGroup(indexMetaData) {
301-
@Override
302-
protected EngineFactory getEngineFactory(ShardRouting routing) {
303-
if (routing.primary()) {
304-
return new InternalEngineFactory(); // use the internal engine so we can index directly
305-
} else {
306-
return new FollowingEngineFactory();
288+
public void testAddNewFollowingReplica() throws Exception {
289+
final byte[] source = "{}".getBytes(StandardCharsets.UTF_8);
290+
final int numDocs = between(1, 100);
291+
final List<Translog.Operation> operations = new ArrayList<>(numDocs);
292+
for (int i = 0; i < numDocs; i++) {
293+
operations.add(new Translog.Index("type", Integer.toString(i), i, primaryTerm, 0, source, null, -1));
294+
}
295+
Future<Void> recoveryFuture = null;
296+
try (ReplicationGroup group = createFollowGroup(between(0, 1))) {
297+
group.startAll();
298+
while (operations.isEmpty() == false) {
299+
List<Translog.Operation> bulkOps = randomSubsetOf(between(1, operations.size()), operations);
300+
operations.removeAll(bulkOps);
301+
BulkShardOperationsRequest bulkRequest = new BulkShardOperationsRequest(group.getPrimary().shardId(),
302+
group.getPrimary().getHistoryUUID(), bulkOps, -1);
303+
new CCRAction(bulkRequest, new PlainActionFuture<>(), group).execute();
304+
if (randomInt(100) < 10) {
305+
group.getPrimary().flush(new FlushRequest());
306+
}
307+
if (recoveryFuture == null && (randomInt(100) < 10 || operations.isEmpty())) {
308+
group.getPrimary().sync();
309+
IndexShard newReplica = group.addReplica();
310+
// We need to recover async to release the main thread for the following task to continue
311+
// to fill ops up to the current max_seq_no which the recovering replica is waiting for.
312+
recoveryFuture = group.asyncRecoverReplica(newReplica,
313+
(shard, sourceNode) -> new RecoveryTarget(shard, sourceNode, recoveryListener, l -> {}) {});
307314
}
308315
}
309-
}) {
310-
shards.startAll();
311-
Thread[] threads = new Thread[between(1, 3)];
312-
AtomicBoolean isStopped = new AtomicBoolean();
313-
AtomicInteger docId = new AtomicInteger();
314-
for (int i = 0; i < threads.length; i++) {
315-
threads[i] = new Thread(() -> {
316-
while (isStopped.get() == false) {
317-
try {
318-
String id = Integer.toString(docId.incrementAndGet());
319-
shards.index(new IndexRequest(index.getName(), "type", id).source("{}", XContentType.JSON));
320-
if (randomInt(100) < 10) {
321-
shards.getPrimary().flush(new FlushRequest());
322-
}
323-
} catch (Exception ex) {
324-
throw new AssertionError(ex);
325-
}
326-
}
327-
});
328-
threads[i].start();
329-
}
330-
assertBusy(() -> assertThat(docId.get(), greaterThanOrEqualTo(50)));
331-
shards.getPrimary().sync();
332-
IndexShard newReplica = shards.addReplica();
333-
shards.recoverReplica(newReplica);
334-
assertBusy(() -> assertThat(docId.get(), greaterThanOrEqualTo(100)));
335-
isStopped.set(true);
336-
for (Thread thread : threads) {
337-
thread.join();
316+
if (recoveryFuture != null) {
317+
recoveryFuture.get();
338318
}
339-
assertBusy(() -> assertThat(getDocIdAndSeqNos(newReplica), equalTo(getDocIdAndSeqNos(shards.getPrimary()))));
319+
group.assertAllEqual(numDocs);
340320
}
341321
}
342322

0 commit comments

Comments
 (0)