Skip to content

Commit ebb2b5e

Browse files
committed
addressing comments + fix gradle check
Signed-off-by: Poojita Raj <poojiraj@amazon.com>
1 parent f366918 commit ebb2b5e

6 files changed

Lines changed: 28 additions & 4 deletions

File tree

server/src/main/java/org/opensearch/OpenSearchException.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1594,6 +1594,12 @@ private enum OpenSearchExceptionHandle {
15941594
org.opensearch.transport.NoSeedNodeLeftException::new,
15951595
160,
15961596
LegacyESVersion.V_7_10_0
1597+
),
1598+
REPLICATION_FAILED_EXCEPTION(
1599+
org.opensearch.indices.replication.common.ReplicationFailedException.class,
1600+
org.opensearch.indices.replication.common.ReplicationFailedException::new,
1601+
161,
1602+
UNKNOWN_VERSION_ADDED
15971603
);
15981604

15991605
final Class<? extends OpenSearchException> exceptionClass;

server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,6 @@ public NRTReplicationEngine(EngineConfig engineConfig) {
8585

8686
public synchronized void updateSegments(final SegmentInfos infos, long seqNo) throws IOException {
8787
// Update the current infos reference on the Engine's reader.
88-
assert engineConfig.isReadOnlyReplica() : "Only replicas should update Infos";
8988
readerManager.updateSegments(infos);
9089

9190
// only update the persistedSeqNo and "lastCommitted" infos reference if the incoming segments have a higher
@@ -95,7 +94,6 @@ public synchronized void updateSegments(final SegmentInfos infos, long seqNo) th
9594
rollTranslogGeneration();
9695
}
9796
localCheckpointTracker.fastForwardProcessedSeqNo(seqNo);
98-
readerManager.maybeRefresh();
9997
}
10098

10199
@Override

server/src/main/java/org/opensearch/index/shard/IndexShard.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1364,9 +1364,18 @@ public GatedCloseable<IndexCommit> acquireLastIndexCommit(boolean flushFirst) th
13641364
}
13651365
}
13661366

1367+
public Optional<NRTReplicationEngine> getReplicationEngine() {
1368+
if (getEngine() instanceof NRTReplicationEngine) {
1369+
return Optional.of((NRTReplicationEngine) getEngine());
1370+
} else {
1371+
return Optional.empty();
1372+
}
1373+
}
1374+
13671375
public void finalizeReplication(SegmentInfos infos, long seqNo) throws IOException {
1368-
assert getEngine() instanceof NRTReplicationEngine;
1369-
((NRTReplicationEngine) getEngine()).updateSegments(infos, seqNo);
1376+
if (getReplicationEngine().isPresent()) {
1377+
getReplicationEngine().get().updateSegments(infos, seqNo);
1378+
}
13701379
}
13711380

13721381
/**

server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,10 @@ private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener<GetSeg
157157
Store.MetadataSnapshot localMetadata = getMetadataSnapshot();
158158
final Store.RecoveryDiff diff = snapshot.recoveryDiff(localMetadata);
159159
logger.debug("Replication diff {}", diff);
160+
// Segments are immutable. So if the replica has any segments with the same name that differ from the one in the incoming snapshot
161+
// from
162+
// source that means the local copy of the segment has been corrupted/changed in some way and we throw an IllegalStateException to
163+
// fail the shard
160164
if (diff.different.isEmpty() == false) {
161165
getFilesListener.onFailure(
162166
new IllegalStateException(

server/src/main/java/org/opensearch/indices/replication/common/ReplicationFailedException.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,11 @@
4040

4141
import java.io.IOException;
4242

43+
/**
44+
* Exception thrown if replication fails
45+
*
46+
* @opensearch.internal
47+
*/
4348
public class ReplicationFailedException extends OpenSearchException {
4449

4550
public ReplicationFailedException(IndexShard shard, Throwable cause) {

server/src/test/java/org/opensearch/ExceptionSerializationTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@
8484
import org.opensearch.indices.InvalidIndexTemplateException;
8585
import org.opensearch.indices.recovery.PeerRecoveryNotFound;
8686
import org.opensearch.indices.recovery.RecoverFilesRecoveryException;
87+
import org.opensearch.indices.replication.common.ReplicationFailedException;
8788
import org.opensearch.ingest.IngestProcessorException;
8889
import org.opensearch.cluster.coordination.NodeHealthCheckFailureException;
8990
import org.opensearch.repositories.RepositoryException;
@@ -849,6 +850,7 @@ public void testIds() {
849850
ids.put(158, PeerRecoveryNotFound.class);
850851
ids.put(159, NodeHealthCheckFailureException.class);
851852
ids.put(160, NoSeedNodeLeftException.class);
853+
ids.put(161, ReplicationFailedException.class);
852854

853855
Map<Class<? extends OpenSearchException>, Integer> reverse = new HashMap<>();
854856
for (Map.Entry<Integer, Class<? extends OpenSearchException>> entry : ids.entrySet()) {

0 commit comments

Comments
 (0)