|
14 | 14 | import org.elasticsearch.action.admin.indices.flush.FlushRequest; |
15 | 15 | import org.elasticsearch.action.bulk.BulkItemResponse; |
16 | 16 | import org.elasticsearch.action.delete.DeleteRequest; |
17 | | -import org.elasticsearch.action.index.IndexRequest; |
18 | 17 | import org.elasticsearch.action.support.PlainActionFuture; |
19 | 18 | import org.elasticsearch.action.support.replication.TransportWriteAction; |
20 | 19 | import org.elasticsearch.cluster.metadata.IndexMetaData; |
|
25 | 24 | import org.elasticsearch.common.unit.ByteSizeUnit; |
26 | 25 | import org.elasticsearch.common.unit.ByteSizeValue; |
27 | 26 | import org.elasticsearch.common.unit.TimeValue; |
28 | | -import org.elasticsearch.common.xcontent.XContentType; |
29 | 27 | import org.elasticsearch.index.IndexSettings; |
30 | 28 | import org.elasticsearch.index.engine.EngineFactory; |
31 | | -import org.elasticsearch.index.engine.InternalEngineFactory; |
32 | 29 | import org.elasticsearch.index.replication.ESIndexLevelReplicationTestCase; |
33 | 30 | import org.elasticsearch.index.seqno.SeqNoStats; |
34 | 31 | import org.elasticsearch.index.shard.IndexShard; |
35 | 32 | import org.elasticsearch.index.shard.IndexShardTestCase; |
36 | 33 | import org.elasticsearch.index.shard.ShardId; |
37 | 34 | import org.elasticsearch.index.translog.Translog; |
| 35 | +import org.elasticsearch.indices.recovery.RecoveryTarget; |
38 | 36 | import org.elasticsearch.threadpool.ThreadPool; |
39 | 37 | import org.elasticsearch.xpack.ccr.CcrSettings; |
40 | 38 | import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsRequest; |
|
44 | 42 | import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory; |
45 | 43 |
|
46 | 44 | import java.io.IOException; |
| 45 | +import java.nio.charset.StandardCharsets; |
47 | 46 | import java.util.ArrayList; |
48 | 47 | import java.util.Arrays; |
49 | 48 | import java.util.Collections; |
50 | 49 | import java.util.HashSet; |
51 | 50 | import java.util.List; |
52 | 51 | import java.util.Set; |
| 52 | +import java.util.concurrent.Future; |
53 | 53 | import java.util.concurrent.atomic.AtomicBoolean; |
54 | | -import java.util.concurrent.atomic.AtomicInteger; |
55 | 54 | import java.util.function.BiConsumer; |
56 | 55 | import java.util.function.Consumer; |
57 | 56 | import java.util.function.LongConsumer; |
58 | 57 | import java.util.stream.Collectors; |
59 | 58 |
|
60 | 59 | import static org.hamcrest.Matchers.equalTo; |
61 | | -import static org.hamcrest.Matchers.greaterThanOrEqualTo; |
62 | 60 | import static org.hamcrest.Matchers.is; |
63 | 61 | import static org.hamcrest.Matchers.nullValue; |
64 | 62 |
|
@@ -287,56 +285,38 @@ public void testRetryBulkShardOperations() throws Exception { |
287 | 285 | } |
288 | 286 | } |
289 | 287 |
|
290 | | - /** |
291 | | - * This is not a following test; it's mostly copied from RecoveryDuringReplicationTests#testAddNewReplicas. |
292 | | - * This test verifies the MSU optimization which is currently only enabled on {@link FollowingEngine}. |
293 | | - */ |
294 | | - public void testAddNewReplicasWithFollowingEngine() throws Exception { |
295 | | - Settings settings = Settings.builder() |
296 | | - .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) |
297 | | - .put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true) |
298 | | - .build(); |
299 | | - IndexMetaData indexMetaData = buildIndexMetaData(between(0, 1), settings, indexMapping); |
300 | | - try (ReplicationGroup shards = new ReplicationGroup(indexMetaData) { |
301 | | - @Override |
302 | | - protected EngineFactory getEngineFactory(ShardRouting routing) { |
303 | | - if (routing.primary()) { |
304 | | - return new InternalEngineFactory(); // use the internal engine so we can index directly |
305 | | - } else { |
306 | | - return new FollowingEngineFactory(); |
| 288 | + public void testAddNewFollowingReplica() throws Exception { |
| 289 | + final byte[] source = "{}".getBytes(StandardCharsets.UTF_8); |
| 290 | + final int numDocs = between(1, 100); |
| 291 | + final List<Translog.Operation> operations = new ArrayList<>(numDocs); |
| 292 | + for (int i = 0; i < numDocs; i++) { |
| 293 | + operations.add(new Translog.Index("type", Integer.toString(i), i, primaryTerm, 0, source, null, -1)); |
| 294 | + } |
| 295 | + Future<Void> recoveryFuture = null; |
| 296 | + try (ReplicationGroup group = createFollowGroup(between(0, 1))) { |
| 297 | + group.startAll(); |
| 298 | + while (operations.isEmpty() == false) { |
| 299 | + List<Translog.Operation> bulkOps = randomSubsetOf(between(1, operations.size()), operations); |
| 300 | + operations.removeAll(bulkOps); |
| 301 | + BulkShardOperationsRequest bulkRequest = new BulkShardOperationsRequest(group.getPrimary().shardId(), |
| 302 | + group.getPrimary().getHistoryUUID(), bulkOps, -1); |
| 303 | + new CCRAction(bulkRequest, new PlainActionFuture<>(), group).execute(); |
| 304 | + if (randomInt(100) < 10) { |
| 305 | + group.getPrimary().flush(new FlushRequest()); |
| 306 | + } |
| 307 | + if (recoveryFuture == null && (randomInt(100) < 10 || operations.isEmpty())) { |
| 308 | + group.getPrimary().sync(); |
| 309 | + IndexShard newReplica = group.addReplica(); |
| 310 | + // We need to recover async to release the main thread for the following task to continue |
| 311 | + // to fill ops up to the current max_seq_no which the recovering replica is waiting for. |
| 312 | + recoveryFuture = group.asyncRecoverReplica(newReplica, |
| 313 | + (shard, sourceNode) -> new RecoveryTarget(shard, sourceNode, recoveryListener, l -> {}) {}); |
307 | 314 | } |
308 | 315 | } |
309 | | - }) { |
310 | | - shards.startAll(); |
311 | | - Thread[] threads = new Thread[between(1, 3)]; |
312 | | - AtomicBoolean isStopped = new AtomicBoolean(); |
313 | | - AtomicInteger docId = new AtomicInteger(); |
314 | | - for (int i = 0; i < threads.length; i++) { |
315 | | - threads[i] = new Thread(() -> { |
316 | | - while (isStopped.get() == false) { |
317 | | - try { |
318 | | - String id = Integer.toString(docId.incrementAndGet()); |
319 | | - shards.index(new IndexRequest(index.getName(), "type", id).source("{}", XContentType.JSON)); |
320 | | - if (randomInt(100) < 10) { |
321 | | - shards.getPrimary().flush(new FlushRequest()); |
322 | | - } |
323 | | - } catch (Exception ex) { |
324 | | - throw new AssertionError(ex); |
325 | | - } |
326 | | - } |
327 | | - }); |
328 | | - threads[i].start(); |
329 | | - } |
330 | | - assertBusy(() -> assertThat(docId.get(), greaterThanOrEqualTo(50))); |
331 | | - shards.getPrimary().sync(); |
332 | | - IndexShard newReplica = shards.addReplica(); |
333 | | - shards.recoverReplica(newReplica); |
334 | | - assertBusy(() -> assertThat(docId.get(), greaterThanOrEqualTo(100))); |
335 | | - isStopped.set(true); |
336 | | - for (Thread thread : threads) { |
337 | | - thread.join(); |
| 316 | + if (recoveryFuture != null) { |
| 317 | + recoveryFuture.get(); |
338 | 318 | } |
339 | | - assertBusy(() -> assertThat(getDocIdAndSeqNos(newReplica), equalTo(getDocIdAndSeqNos(shards.getPrimary())))); |
| 319 | + group.assertAllEqual(numDocs); |
340 | 320 | } |
341 | 321 | } |
342 | 322 |
|
|
0 commit comments