3434import org .elasticsearch .cluster .routing .IndexShardRoutingTable ;
3535import org .elasticsearch .cluster .routing .ShardRouting ;
3636import org .elasticsearch .cluster .service .ClusterService ;
37+ import org .elasticsearch .common .Nullable ;
38+ import org .elasticsearch .common .Strings ;
3739import org .elasticsearch .common .UUIDs ;
3840import org .elasticsearch .common .component .AbstractComponent ;
3941import org .elasticsearch .common .inject .Inject ;
6567import java .io .IOException ;
6668import java .util .ArrayList ;
6769import java .util .Collections ;
70+ import java .util .HashMap ;
6871import java .util .List ;
6972import java .util .Map ;
7073import java .util .concurrent .ConcurrentMap ;
@@ -216,9 +219,16 @@ public void onResponse(InFlightOpsResponse response) {
216219 if (inflight != 0 ) {
217220 actionListener .onResponse (new ShardsSyncedFlushResult (shardId , totalShards , "[" + inflight + "] ongoing operations on primary" ));
218221 } else {
219- // 3. now send the sync request to all the shards
220- String syncId = UUIDs .randomBase64UUID ();
221- sendSyncRequests (syncId , activeShards , state , presyncResponses , shardId , totalShards , actionListener );
222+ // 3. now send the sync request to all the shards;
223+ final String sharedSyncId = sharedExistingSyncId (presyncResponses );
224+ if (sharedSyncId != null ) {
225+ assert presyncResponses .values ().stream ().allMatch (r -> r .existingSyncId .equals (sharedSyncId )) :
226+ "Not all shards have the same existing sync id [" + sharedSyncId + "], responses [" + presyncResponses + "]" ;
227+ reportSuccessWithExistingSyncId (shardId , sharedSyncId , activeShards , totalShards , presyncResponses , actionListener );
228+ }else {
229+ String syncId = UUIDs .randomBase64UUID ();
230+ sendSyncRequests (syncId , activeShards , state , presyncResponses , shardId , totalShards , actionListener );
231+ }
222232 }
223233 }
224234
@@ -244,6 +254,33 @@ public void onFailure(Exception e) {
244254 }
245255 }
246256
257+ private String sharedExistingSyncId (Map <String , PreSyncedFlushResponse > preSyncedFlushResponses ) {
258+ String existingSyncId = null ;
259+ for (PreSyncedFlushResponse resp : preSyncedFlushResponses .values ()) {
260+ if (Strings .isNullOrEmpty (resp .existingSyncId )) {
261+ return null ;
262+ }
263+ if (existingSyncId == null ) {
264+ existingSyncId = resp .existingSyncId ;
265+ }
266+ if (existingSyncId .equals (resp .existingSyncId ) == false ) {
267+ return null ;
268+ }
269+ }
270+ return existingSyncId ;
271+ }
272+
273+ private void reportSuccessWithExistingSyncId (ShardId shardId , String existingSyncId , List <ShardRouting > shards , int totalShards ,
274+ Map <String , PreSyncedFlushResponse > preSyncResponses , ActionListener <ShardsSyncedFlushResult > listener ) {
275+ final Map <ShardRouting , ShardSyncedFlushResponse > results = new HashMap <>();
276+ for (final ShardRouting shard : shards ) {
277+ if (preSyncResponses .containsKey (shard .currentNodeId ())) {
278+ results .put (shard , new ShardSyncedFlushResponse ());
279+ }
280+ }
281+ listener .onResponse (new ShardsSyncedFlushResult (shardId , existingSyncId , totalShards , results ));
282+ }
283+
247284 final IndexShardRoutingTable getShardRoutingTable (ShardId shardId , ClusterState state ) {
248285 final IndexRoutingTable indexRoutingTable = state .routingTable ().index (shardId .getIndexName ());
249286 if (indexRoutingTable == null ) {
@@ -438,7 +475,7 @@ private PreSyncedFlushResponse performPreSyncedFlush(PreShardSyncedFlushRequest
438475 final CommitStats commitStats = indexShard .commitStats ();
439476 final Engine .CommitId commitId = commitStats .getRawCommitId ();
440477 logger .trace ("{} pre sync flush done. commit id {}, num docs {}" , request .shardId (), commitId , commitStats .getNumDocs ());
441- return new PreSyncedFlushResponse (commitId , commitStats .getNumDocs ());
478+ return new PreSyncedFlushResponse (commitId , commitStats .getNumDocs (), commitStats . syncId () );
442479 }
443480
444481 private ShardSyncedFlushResponse performSyncedFlush (ShardSyncedFlushRequest request ) {
@@ -512,41 +549,49 @@ static final class PreSyncedFlushResponse extends TransportResponse {
512549
513550 Engine .CommitId commitId ;
514551 int numDocs ;
552+ @ Nullable String existingSyncId = null ;
515553
516554 PreSyncedFlushResponse () {
517555 }
518556
519- PreSyncedFlushResponse (Engine .CommitId commitId , int numDocs ) {
557+ PreSyncedFlushResponse (Engine .CommitId commitId , int numDocs , String existingSyncId ) {
520558 this .commitId = commitId ;
521559 this .numDocs = numDocs ;
560+ this .existingSyncId = existingSyncId ;
522561 }
523562
524- Engine . CommitId commitId ( ) {
525- return commitId ;
563+ boolean includeNumDocs ( Version version ) {
564+ return version . onOrAfter ( Version . V_5_6_8 ) ;
526565 }
527566
528- int numDocs ( ) {
529- return numDocs ;
567+ boolean includeExistingSyncId ( Version version ) {
568+ return version . onOrAfter ( Version . V_5_6_9 ) ;
530569 }
531570
532571 @ Override
533572 public void readFrom (StreamInput in ) throws IOException {
534573 super .readFrom (in );
535574 commitId = new Engine .CommitId (in );
536- if (in .getVersion (). onOrAfter ( Version . V_5_6_8 )) {
575+ if (includeNumDocs ( in .getVersion ())) {
537576 numDocs = in .readInt ();
538577 } else {
539578 numDocs = UNKNOWN_NUM_DOCS ;
540579 }
580+ if (includeExistingSyncId (in .getVersion ())) {
581+ existingSyncId = in .readOptionalString ();
582+ }
541583 }
542584
543585 @ Override
544586 public void writeTo (StreamOutput out ) throws IOException {
545587 super .writeTo (out );
546588 commitId .writeTo (out );
547- if (out .getVersion (). onOrAfter ( Version . V_5_6_8 )) {
589+ if (includeNumDocs ( out .getVersion ())) {
548590 out .writeInt (numDocs );
549591 }
592+ if (includeExistingSyncId (out .getVersion ())) {
593+ out .writeOptionalString (existingSyncId );
594+ }
550595 }
551596 }
552597
0 commit comments