Skip to content

Commit fe22335

Browse files
committed
making waitForAssignment same
Signed-off-by: Gaurav Bafna <gbbafna@amazon.com>
1 parent 74ce992 commit fe22335

5 files changed

Lines changed: 70 additions & 86 deletions

File tree

server/src/main/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public LocalStorePeerRecoverySourceHandler(
5555
@Override
5656
protected void innerRecoveryToTarget(ActionListener<RecoveryResponse> listener, Consumer<Exception> onFailure) throws IOException {
5757
final SetOnce<RetentionLease> retentionLeaseRef = new SetOnce<>();
58-
waitForAssignment(retentionLeaseRef);
58+
waitForAssignmentPropagate(retentionLeaseRef);
5959
final Closeable retentionLock = shard.acquireHistoryRetentionLock();
6060
resources.add(retentionLock);
6161
final long startingSeqNo;

server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java

Lines changed: 44 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.lucene.util.ArrayUtil;
4242
import org.opensearch.action.ActionRunnable;
4343
import org.opensearch.action.StepListener;
44+
import org.opensearch.action.bulk.BackoffPolicy;
4445
import org.opensearch.action.support.PlainActionFuture;
4546
import org.opensearch.action.support.ThreadedActionListener;
4647
import org.opensearch.action.support.replication.ReplicationResponse;
@@ -83,12 +84,14 @@
8384
import java.util.ArrayList;
8485
import java.util.Collections;
8586
import java.util.Comparator;
87+
import java.util.Iterator;
8688
import java.util.List;
8789
import java.util.Locale;
8890
import java.util.concurrent.CopyOnWriteArrayList;
8991
import java.util.concurrent.atomic.AtomicBoolean;
9092
import java.util.concurrent.atomic.AtomicInteger;
9193
import java.util.concurrent.atomic.AtomicLong;
94+
import java.util.concurrent.atomic.AtomicReference;
9295
import java.util.function.Consumer;
9396
import java.util.function.IntSupplier;
9497
import java.util.stream.StreamSupport;
@@ -195,24 +198,49 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
195198
protected abstract void innerRecoveryToTarget(ActionListener<RecoveryResponse> listener, Consumer<Exception> onFailure)
196199
throws IOException;
197200

198-
protected void waitForAssignment(SetOnce<RetentionLease> retentionLeaseRef) {
199-
RunUnderPrimaryPermit.run(() -> {
200-
final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable();
201-
ShardRouting targetShardRouting = routingTable.getByAllocationId(request.targetAllocationId());
202-
if (targetShardRouting == null) {
203-
logger.debug(
204-
"delaying recovery of {} as it is not listed as assigned to target node {}",
205-
request.shardId(),
206-
request.targetNode()
207-
);
208-
throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node");
209-
}
210-
assert targetShardRouting.initializing() : "expected recovery target to be initializing but was " + targetShardRouting;
211-
retentionLeaseRef.set(shard.getRetentionLeases().get(ReplicationTracker.getPeerRecoveryRetentionLeaseId(targetShardRouting)));
212-
}, shardId + " validating recovery target [" + request.targetAllocationId() + "] registered ", shard, cancellableThreads, logger);
213-
}
201+
/*
202+
Waits for cluster state propagation of assignment of replica on the target node
203+
*/
204+
void waitForAssignmentPropagate(SetOnce<RetentionLease> retentionLeaseRef) {
205+
BackoffPolicy EXPONENTIAL_BACKOFF_POLICY = BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(1000), 5);
206+
AtomicReference<ShardRouting> targetShardRouting = new AtomicReference<>();
207+
Iterator<TimeValue> backoffDelayIterator = EXPONENTIAL_BACKOFF_POLICY.iterator();
208+
while (backoffDelayIterator.hasNext()) {
209+
RunUnderPrimaryPermit.run(() -> {
210+
final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable();
211+
targetShardRouting.set(routingTable.getByAllocationId(request.targetAllocationId()));
212+
if (targetShardRouting.get() == null) {
213+
logger.info(
214+
"delaying recovery of {} as it is not listed as assigned to target node {}",
215+
request.shardId(),
216+
request.targetNode()
217+
);
218+
Thread.sleep(backoffDelayIterator.next().millis());
219+
}
220+
if (targetShardRouting.get() != null) {
221+
assert targetShardRouting.get().initializing() : "expected recovery target to be initializing but was "
222+
+ targetShardRouting;
223+
retentionLeaseRef.set(
224+
shard.getRetentionLeases().get(ReplicationTracker.getPeerRecoveryRetentionLeaseId(targetShardRouting.get()))
225+
);
226+
}
214227

228+
},
229+
shardId + " validating recovery target [" + request.targetAllocationId() + "] registered ",
230+
shard,
231+
cancellableThreads,
232+
logger
233+
);
215234

235+
if (targetShardRouting.get() != null) {
236+
return;
237+
}
238+
}
239+
if (targetShardRouting.get() != null) {
240+
return;
241+
}
242+
throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node");
243+
}
216244

217245
protected void finalizeStepAndCompleteFuture(
218246
long startingSeqNo,

server/src/main/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandler.java

Lines changed: 1 addition & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,12 @@
1010

1111
import org.apache.lucene.index.IndexCommit;
1212
import org.opensearch.action.StepListener;
13-
import org.opensearch.action.bulk.BackoffPolicy;
14-
import org.opensearch.cluster.routing.IndexShardRoutingTable;
15-
import org.opensearch.cluster.routing.ShardRouting;
1613
import org.opensearch.common.SetOnce;
1714
import org.opensearch.common.concurrent.GatedCloseable;
1815
import org.opensearch.common.lease.Releasable;
1916
import org.opensearch.common.unit.TimeValue;
2017
import org.opensearch.core.action.ActionListener;
2118
import org.opensearch.index.engine.RecoveryEngineException;
22-
import org.opensearch.index.seqno.ReplicationTracker;
2319
import org.opensearch.index.seqno.RetentionLease;
2420
import org.opensearch.index.seqno.SequenceNumbers;
2521
import org.opensearch.index.shard.IndexShard;
@@ -28,8 +24,6 @@
2824
import org.opensearch.transport.Transports;
2925

3026
import java.io.IOException;
31-
import java.util.Iterator;
32-
import java.util.concurrent.atomic.AtomicReference;
3327
import java.util.function.Consumer;
3428

3529
/**
@@ -57,7 +51,7 @@ protected void innerRecoveryToTarget(ActionListener<RecoveryResponse> listener,
5751
// updated segments file on refresh, flushes, and merges. In recovery, here, only file-based recovery is performed
5852
// and there is no translog replay done.
5953
final SetOnce<RetentionLease> retentionLeaseRef = new SetOnce<>();
60-
waitForAssignment(retentionLeaseRef);
54+
waitForAssignmentPropagate(retentionLeaseRef);
6155
final StepListener<SendFileResult> sendFileStep = new StepListener<>();
6256
final StepListener<TimeValue> prepareEngineStep = new StepListener<>();
6357
final StepListener<SendSnapshotResult> sendSnapshotStep = new StepListener<>();
@@ -112,40 +106,4 @@ protected void innerRecoveryToTarget(ActionListener<RecoveryResponse> listener,
112106
finalizeStepAndCompleteFuture(startingSeqNo, sendSnapshotStep, sendFileStep, prepareEngineStep, onFailure);
113107
}
114108

115-
protected void waitForAssignment(SetOnce<RetentionLease> retentionLeaseRef) {
116-
BackoffPolicy EXPONENTIAL_BACKOFF_POLICY = BackoffPolicy.exponentialBackoff(
117-
TimeValue.timeValueMillis(100),
118-
3
119-
);
120-
AtomicReference<ShardRouting> targetShardRouting = new AtomicReference<>();
121-
Iterator<TimeValue> backoffDelayIterator = EXPONENTIAL_BACKOFF_POLICY.iterator();
122-
while (backoffDelayIterator.hasNext() ) {
123-
RunUnderPrimaryPermit.run(() -> {
124-
final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable();
125-
targetShardRouting.set(routingTable.getByAllocationId(request.targetAllocationId()));
126-
if (targetShardRouting.get() == null) {
127-
logger.info(
128-
"delaying recovery of {} as it is not listed as assigned to target node {}",
129-
request.shardId(),
130-
request.targetNode()
131-
);
132-
Thread.sleep(backoffDelayIterator.next().millis());
133-
}
134-
if (targetShardRouting.get() != null) {
135-
assert targetShardRouting.get().initializing() : "expected recovery target to be initializing but was " + targetShardRouting;
136-
retentionLeaseRef.set(shard.getRetentionLeases().get(ReplicationTracker.getPeerRecoveryRetentionLeaseId(targetShardRouting.get())));
137-
}
138-
139-
}, shardId + " validating recovery target [" + request.targetAllocationId() + "] registered ", shard, cancellableThreads, logger);
140-
141-
if (targetShardRouting.get() != null) {
142-
return;
143-
}
144-
}
145-
if (targetShardRouting.get() != null) {
146-
return;
147-
}
148-
throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node");
149-
}
150-
151109
}

server/src/test/java/org/opensearch/indices/recovery/LocalStorePeerRecoverySourceHandlerTests.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,6 @@
4545
import org.apache.lucene.store.IOContext;
4646
import org.apache.lucene.tests.index.RandomIndexWriter;
4747
import org.apache.lucene.tests.store.BaseDirectoryWrapper;
48-
import org.junit.After;
49-
import org.junit.Before;
5048
import org.opensearch.ExceptionsHelper;
5149
import org.opensearch.Version;
5250
import org.opensearch.action.LatchedActionListener;
@@ -92,7 +90,6 @@
9290
import org.opensearch.index.shard.IndexShard;
9391
import org.opensearch.index.shard.IndexShardRelocatedException;
9492
import org.opensearch.index.shard.IndexShardState;
95-
import org.opensearch.index.shard.ReplicationGroup;
9693
import org.opensearch.index.store.Store;
9794
import org.opensearch.index.store.StoreFileMetadata;
9895
import org.opensearch.index.translog.Translog;
@@ -106,6 +103,8 @@
106103
import org.opensearch.threadpool.FixedExecutorBuilder;
107104
import org.opensearch.threadpool.TestThreadPool;
108105
import org.opensearch.threadpool.ThreadPool;
106+
import org.junit.After;
107+
import org.junit.Before;
109108

110109
import java.io.IOException;
111110
import java.io.OutputStream;
@@ -757,7 +756,7 @@ public void testThrowExceptionOnNoTargetInRouting() throws IOException {
757756
when(shard.seqNoStats()).thenReturn(mock(SeqNoStats.class));
758757
when(shard.segmentStats(anyBoolean(), anyBoolean())).thenReturn(mock(SegmentsStats.class));
759758
when(shard.isRelocatedPrimary()).thenReturn(false);
760-
final ReplicationGroup replicationGroup = mock(ReplicationGroup.class);
759+
final org.opensearch.index.shard.ReplicationGroup replicationGroup = mock(org.opensearch.index.shard.ReplicationGroup.class);
761760
final IndexShardRoutingTable routingTable = mock(IndexShardRoutingTable.class);
762761
when(routingTable.getByAllocationId(anyString())).thenReturn(null);
763762
when(shard.getReplicationGroup()).thenReturn(replicationGroup);
@@ -842,7 +841,7 @@ void phase2(
842841
handler.recoverToTarget(future);
843842
future.actionGet();
844843
});
845-
verify(routingTable, times(1)).getByAllocationId(null);
844+
verify(routingTable, times(3)).getByAllocationId(null);
846845
assertFalse(phase1Called.get());
847846
assertFalse(prepareTargetForTranslogCalled.get());
848847
assertFalse(phase2Called.get());

server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java

Lines changed: 20 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -118,27 +118,6 @@ public void testReplicaShardRecoveryUptoLastFlushedCommit() throws Exception {
118118
}
119119
}
120120

121-
public StartRecoveryRequest getStartRecoveryRequest() throws IOException {
122-
Store.MetadataSnapshot metadataSnapshot = randomBoolean()
123-
? Store.MetadataSnapshot.EMPTY
124-
: new Store.MetadataSnapshot(
125-
Collections.emptyMap(),
126-
Collections.singletonMap(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID()),
127-
randomIntBetween(0, 100)
128-
);
129-
return new StartRecoveryRequest(
130-
shardId,
131-
null,
132-
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
133-
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
134-
metadataSnapshot,
135-
randomBoolean(),
136-
randomNonNegativeLong(),
137-
randomBoolean() || metadataSnapshot.getHistoryUUID() == null ? SequenceNumbers.UNASSIGNED_SEQ_NO : randomNonNegativeLong()
138-
);
139-
}
140-
141-
142121
public void testThrowExceptionOnNoTargetInRouting() throws IOException {
143122
final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service);
144123
final StartRecoveryRequest request = getStartRecoveryRequest();
@@ -236,4 +215,24 @@ void phase2(
236215
assertFalse(prepareTargetForTranslogCalled.get());
237216
assertFalse(phase2Called.get());
238217
}
218+
219+
public StartRecoveryRequest getStartRecoveryRequest() throws IOException {
220+
Store.MetadataSnapshot metadataSnapshot = randomBoolean()
221+
? Store.MetadataSnapshot.EMPTY
222+
: new Store.MetadataSnapshot(
223+
Collections.emptyMap(),
224+
Collections.singletonMap(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID()),
225+
randomIntBetween(0, 100)
226+
);
227+
return new StartRecoveryRequest(
228+
shardId,
229+
null,
230+
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
231+
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
232+
metadataSnapshot,
233+
randomBoolean(),
234+
randomNonNegativeLong(),
235+
randomBoolean() || metadataSnapshot.getHistoryUUID() == null ? SequenceNumbers.UNASSIGNED_SEQ_NO : randomNonNegativeLong()
236+
);
237+
}
239238
}

0 commit comments

Comments
 (0)