Skip to content

Commit f47e56e

Browse files
committed
Handle the synced-flush case which skips most of phase 1
1 parent d63e777 commit f47e56e

2 files changed

Lines changed: 59 additions & 15 deletions

File tree

server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -430,14 +430,6 @@ static final class SendFileResult {
430430
void phase1(IndexCommit snapshot, Consumer<ActionListener<RetentionLease>> createRetentionLease,
431431
IntSupplier translogOps, ActionListener<SendFileResult> listener) {
432432
cancellableThreads.checkForCancel();
433-
// Total size of segment files that are recovered
434-
long totalSizeInBytes = 0;
435-
// Total size of segment files that were able to be re-used
436-
long existingTotalSizeInBytes = 0;
437-
final List<String> phase1FileNames = new ArrayList<>();
438-
final List<Long> phase1FileSizes = new ArrayList<>();
439-
final List<String> phase1ExistingFileNames = new ArrayList<>();
440-
final List<Long> phase1ExistingFileSizes = new ArrayList<>();
441433
final Store store = shard.store();
442434
try {
443435
StopWatch stopWatch = new StopWatch().start();
@@ -457,6 +449,16 @@ void phase1(IndexCommit snapshot, Consumer<ActionListener<RetentionLease>> creat
457449
}
458450
}
459451
if (canSkipPhase1(recoverySourceMetadata, request.metadataSnapshot()) == false) {
452+
final List<String> phase1FileNames = new ArrayList<>();
453+
final List<Long> phase1FileSizes = new ArrayList<>();
454+
final List<String> phase1ExistingFileNames = new ArrayList<>();
455+
final List<Long> phase1ExistingFileSizes = new ArrayList<>();
456+
457+
// Total size of segment files that are recovered
458+
long totalSizeInBytes = 0;
459+
// Total size of segment files that were able to be re-used
460+
long existingTotalSizeInBytes = 0;
461+
460462
// Generate a "diff" of all the identical, different, and missing
461463
// segment files on the target node, using the existing files on
462464
// the source node
@@ -524,15 +526,21 @@ void phase1(IndexCommit snapshot, Consumer<ActionListener<RetentionLease>> creat
524526
phase1ExistingFileSizes, existingTotalSize, took));
525527
}, listener::onFailure);
526528
} else {
527-
logger.trace("skipping [phase1]- identical sync id [{}] found on both source and target",
528-
recoverySourceMetadata.getSyncId());
529-
final TimeValue took = stopWatch.totalTime();
530-
logger.trace("recovery [phase1]: took [{}]", took);
531-
listener.onResponse(new SendFileResult(phase1FileNames, phase1FileSizes, totalSizeInBytes, phase1ExistingFileNames,
532-
phase1ExistingFileSizes, existingTotalSizeInBytes, took));
529+
logger.trace("skipping [phase1] since source and target have identical sync id [{}]", recoverySourceMetadata.getSyncId());
530+
531+
// but we must still create a retention lease
532+
final StepListener<RetentionLease> createRetentionLeaseStep = new StepListener<>();
533+
createRetentionLease.accept(createRetentionLeaseStep);
534+
createRetentionLeaseStep.whenComplete(retentionLease -> {
535+
final TimeValue took = stopWatch.totalTime();
536+
logger.trace("recovery [phase1]: took [{}]", took);
537+
listener.onResponse(new SendFileResult(Collections.emptyList(), Collections.emptyList(), 0L, Collections.emptyList(),
538+
Collections.emptyList(), 0L, took));
539+
}, listener::onFailure);
540+
533541
}
534542
} catch (Exception e) {
535-
throw new RecoverFilesRecoveryException(request.shardId(), phase1FileNames.size(), new ByteSizeValue(totalSizeInBytes), e);
543+
throw new RecoverFilesRecoveryException(request.shardId(), 0, new ByteSizeValue(0L), e);
536544
}
537545
}
538546

server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.elasticsearch.action.support.PlainActionFuture;
3838
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
3939
import org.elasticsearch.action.support.replication.ReplicationResponse;
40+
import org.elasticsearch.cluster.ClusterState;
4041
import org.elasticsearch.cluster.action.shard.ShardStateAction;
4142
import org.elasticsearch.cluster.metadata.IndexMetaData;
4243
import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -876,6 +877,7 @@ public void testHistoryRetention() throws Exception {
876877
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(secondNodeToStop));
877878

878879
final long desyncNanoTime = System.nanoTime();
880+
//noinspection StatementWithEmptyBody
879881
while (System.nanoTime() <= desyncNanoTime) {
880882
// time passes
881883
}
@@ -1015,6 +1017,40 @@ public void testRecoveryFlushReplica() throws Exception {
10151017
assertThat(syncIds, hasSize(1));
10161018
}
10171019

1020+
public void testRecoveryUsingSyncedFlushWithoutRetentionLease() throws Exception {
1021+
internalCluster().ensureAtLeastNumDataNodes(2);
1022+
String indexName = "test-index";
1023+
createIndex(indexName, Settings.builder()
1024+
.put("index.number_of_shards", 1)
1025+
.put("index.number_of_replicas", 1)
1026+
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "24h") // do not reallocate the lost shard
1027+
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_PERIOD_SETTING.getKey(), "100ms") // expire leases quickly
1028+
.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "100ms") // sync frequently
1029+
.build());
1030+
int numDocs = randomIntBetween(0, 10);
1031+
indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, numDocs)
1032+
.mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList()));
1033+
ensureGreen(indexName);
1034+
1035+
final ShardId shardId = new ShardId(resolveIndex(indexName), 0);
1036+
assertThat(SyncedFlushUtil.attemptSyncedFlush(logger, internalCluster(), shardId).successfulShards(), equalTo(2));
1037+
1038+
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
1039+
final ShardRouting shardToResync = randomFrom(clusterState.routingTable().shardRoutingTable(shardId).activeShards());
1040+
internalCluster().restartNode(clusterState.nodes().get(shardToResync.currentNodeId()).getName(),
1041+
new InternalTestCluster.RestartCallback() {
1042+
@Override
1043+
public Settings onNodeStopped(String nodeName) throws Exception {
1044+
assertBusy(() -> assertFalse(client().admin().indices().prepareStats(indexName).get()
1045+
.getShards()[0].getRetentionLeaseStats().retentionLeases().contains(
1046+
ReplicationTracker.getPeerRecoveryRetentionLeaseId(shardToResync))));
1047+
return super.onNodeStopped(nodeName);
1048+
}
1049+
});
1050+
1051+
ensureGreen(indexName);
1052+
}
1053+
10181054
public void testRecoverLocallyUpToGlobalCheckpoint() throws Exception {
10191055
internalCluster().ensureAtLeastNumDataNodes(2);
10201056
List<String> nodes = randomSubsetOf(2, StreamSupport.stream(clusterService().state().nodes().getDataNodes().spliterator(), false)

0 commit comments

Comments
 (0)