Skip to content

[Segment Replication] Avoid data loss in vanilla segment replication#20150

Merged
linuxpi merged 13 commits intoopensearch-project:mainfrom
guojialiang92:dev/fix-data-loss-in-vanilla-segment-replication
Dec 10, 2025
Merged

[Segment Replication] Avoid data loss in vanilla segment replication#20150
linuxpi merged 13 commits intoopensearch-project:mainfrom
guojialiang92:dev/fix-data-loss-in-vanilla-segment-replication

Conversation

@guojialiang92
Copy link
Copy Markdown
Contributor

@guojialiang92 guojialiang92 commented Dec 2, 2025

Description

The purpose of this PR is to address the issue of data loss in vanilla segment replication mentioned in #20118.

Related Issues

Resolves #[20118]

Instructions

When constructing CopyState, put the Math.min(maxSeqNo, lastRefreshedCheckpoint) in max_seq_no. Ensure the userData structure is consistent across segment replication and remote store.

To make it easier to understand, I need to explain first. The max_seq_no here (Math.min(maxSeqNo, lastRefreshedCheckpoint)) is retrieved from the userData of segmentInfosSnapshot during the initialization of CopyState, and it was recorded in the userData during the last commitIndexWriter. In other words, it is not the latest value recorded in InternalEngine#localCheckpointTracker.

I did not directly put lastRefreshedCheckpoint in userData's max_seq_no. It is designed to ensure that Test SegmentReplicationIT#testRestartPrimary can pass under the new logic, and I will provide a detailed explanation.

This test simulates the process of writing a document, restarting the primary shard node after completing segment replication, and waiting for the cluster to reach the green state.

Let's first take a look at the execution process in the current main branch.

  1. When the segment replication is completed, the SegmentInfos#version of the primary shard and the replica are equal (it is 9 in the test).
  2. When the primary shard node restarts, it will perform a flush operation. Since the condition is met, the primary shard will perform commitIndexWriter, and the SegmentInfos#version will be incremented by 1 in the logic of IndexWriter#setLiveCommitData and IndexWriter#prepareCommitInternal respectively (updated from 9 to 11).
  3. During the primary promotion, the replica will first close the NRTReplicationEngine, the SegmentInfos#version of the replica will be incremented by 1 in NRTReplicationEngine#closeNoLock (updated from 9 to 10). Then it will switch to the InternalEngine, perform translog recovery (the actual number of recovered operations is 0), and then execute the flush operation. In the current logic, because condition is met (getProcessedLocalCheckpoint() is 0, local_checkpoint is -1), shard will perform commitIndexWriter, and the SegmentInfos#version will be incremented by 1 in the logic of IndexWriter#setLiveCommitData and IndexWriter#prepareCommitInternal respectively (updated from 10 to 12).
  4. After the primary promotion, the new primary shard will send a force segment replication request to the replica. Since the segmentInfosVersion of the primary shard (12) is greater than the segmentInfosVersion of the replica (11), force segment replication can be executed normally.

Next, let's take a look at the execution process of directly using lastRefreshedCheckpoint.

  1. Same as above.
  2. Same as above.
  3. During the primary promotion, the replica will first close the NRTReplicationEngine, the SegmentInfos#version of the replica will be incremented by 1 in NRTReplicationEngine#closeNoLock (updated from 9 to 10). Then it will switch to the InternalEngine, perform translog recovery (the actual number of recovered operations is 0), and then execute the flush operation. Since the local checkpoint is updated using the latest refreshed checkpoint, condition is not met (getProcessedLocalCheckpoint() is 0, local_checkpoint is 0), and the SegmentInfos#version remains 10.
  4. After the primary promotion, the new primary shard will send a force segment replication request to the replica. During the segment replication process, if the replica detects that the segmentInfosVersion of the primary shard (10) is less than the local segmentInfosVersion (11), it will cause the recovery of the replica to fail.

Finally, let's take a look at the solution using Math.min(maxSeqNo, lastRefreshedCheckpoint).

We need to ensure that after the primary promotion, the SegmentInfos#version of the primary shard is always greater than or equal to that of the replicas. At the same time, it is also necessary to ensure that there is no risk of data loss.

During the primary promotion process, restoreVersionMapAndCheckpointTracker ensures the live version map and checkpoint tracker are in sync with the Lucene commit. Therefore, we only need to ensure that the local_checkpoint of the replica shard is less than the checkpoint in the Lucene commit, rather than strictly equal.

When using Math.min(maxSeqNo, lastRefreshedCheckpoint), there are the following scenarios.

  1. maxSeqNo < lastRefreshedCheckpoint. The old primary shard has not yet performed a flush. During the restart of the old primary shard and the promotion of the new primary shard, commitIndexWriter will be executed because the condition are met.
  2. maxSeqNo = lastRefreshedCheckpoint. After writing the last doc, a flush operation was performed. During the restart process of the old primary shard and the promotion process of the new primary shard, neither will execute commitIndexWriter due to unmet condition.
  3. maxSeqNo > lastRefreshedCheckpoint. There are documents in the translog of the old primary shard that have not been indexed. During the restart of the old primary shard and the promotion of the new primary shard, commitIndexWriter will be executed because the condition are met.

All scenarios above can ensure that the segmentInfosVersion of the primary shard is greater than or equal to that of the replica. Meanwhile, since local_checkpoint of the replica is less than or equal to checkpoint of the Lucene commit, no data loss will occur during primary promotion.

Additionally, it should be noted that the remote store does not have the above issues. Although when restarting the old primary shard node, the SegmentInfos#version is updated to 11 due to the execution of the flush operation, which is greater than that of the old replica. However, when the new replica starts, it will download the segment files from the remote store. Thus ensuring that force segment replication can execute normally.

Test

Introduce Test SegmentReplicationPrimaryPromotionIT#testPrimaryStopped_ReplicaPromoted_no_data_loss to simulate the reproduced case in #20118.

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Summary by CodeRabbit

  • Bug Fixes

    • Preserve checkpoint and sequence-number metadata during segment replication copy and primary promotion to prevent metadata loss.
  • Tests

    • Added an integration test that simulates primary shutdown, replica promotion, coordinated timing, and injected transport failures to validate no data loss on promotion.
  • New Features

    • Exposed a shard-level accessor for the last refreshed checkpoint; copy-state now reads and constrains max-sequence metadata accordingly.

✏️ Tip: You can customize this high-level summary in your review settings.

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Dec 2, 2025

Walkthrough

Exposes InternalEngine.lastRefreshedCheckpoint via IndexShard.getLastRefreshedCheckpoint, embeds that checkpoint into CopyState's SegmentInfos snapshot (clamping MAX_SEQ_NO), and adds an integration test SegmentReplicationPrimaryPromotionIT with mock engines and transport hooks to validate replica promotion without data loss.

Changes

Cohort / File(s) Summary
Integration test + mocks
server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationPrimaryPromotionIT.java
New integration test class adding MockEnginePlugin, MockEngineFactory, MockInternalEngine, MockNRTReplicationEngine, public synchronization hooks (lockEnable, indexLuceneLatch, flushLatch, refreshLatch), and test testPrimaryStopped_ReplicaPromoted_no_data_loss() that coordinates concurrent writes, injects a mock transport exception during checkpoint publish, stops the primary, promotes the replica, and asserts no data loss.
IndexShard API
server/src/main/java/org/opensearch/index/shard/IndexShard.java
Adds import for InternalEngine and a new public method getLastRefreshedCheckpoint() that casts the shard engine to InternalEngine and returns lastRefreshedCheckpoint().
CopyState snapshot logic
server/src/main/java/org/opensearch/indices/replication/common/CopyState.java
CopyState now reads lastRefreshedCheckpoint, clones SegmentInfos into a snapshot, clamps MAX_SEQ_NO in the snapshot to min(current MAX_SEQ_NO, lastRefreshedCheckpoint), and writes the modified snapshot into the CopyState bytes used for segment replication initialization.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

  • Inspect mock engine and transport hook implementations for thread-safety, latch/lock correctness, and clean shutdown.
  • Verify SegmentInfos cloning and MAX_SEQ_NO clamping are serialized correctly and maintain on-disk compatibility.
  • Confirm IndexShard.getLastRefreshedCheckpoint() cast is safe across engine implementations and intended for segment-replication initialization.

Possibly related issues

Poem

🐇 I nibble checkpoints in the pale moonlight,

Latches click, mocks hop through the night,
A transport hiccup, then a steady climb,
Replica rises, no data left behind,
Carrots safe — a tiny rabbit's delight.

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 20.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and concisely summarizes the main change: addressing data loss in vanilla segment replication, which matches the core objective of the PR.
Description check ✅ Passed The PR description comprehensively addresses all required template sections with detailed technical explanation of the changes, related issue, rationale, test coverage, and compliance with contribution guidelines.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Dec 2, 2025

❌ Gradle check result for aae28f2: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (3)
server/src/main/java/org/opensearch/index/shard/IndexShard.java (1)

138-139: Guard getLastRefreshedCheckpoint() against non‑InternalEngine engines

getLastRefreshedCheckpoint() unconditionally casts Engine to InternalEngine. If a shard ever runs with a different Engine implementation (e.g., a custom EngineFactory, ReadOnlyEngine, or future engine type) and this method is called, it will fail with a ClassCastException (assert only catches it in tests).

Consider adding an explicit runtime check and a clearer failure mode, e.g.:

-    public long getLastRefreshedCheckpoint() {
-        Engine engine = getEngine();
-        assert engine instanceof InternalEngine;
-        return ((InternalEngine) engine).lastRefreshedCheckpoint();
-    }
+    public long getLastRefreshedCheckpoint() {
+        final Engine engine = getEngine();
+        if (engine instanceof InternalEngine internalEngine) {
+            return internalEngine.lastRefreshedCheckpoint();
+        }
+        throw new IllegalStateException(
+            "lastRefreshedCheckpoint is only available for InternalEngine but found [" + engine.getClass().getName() + "]"
+        );
+    }

This keeps the assumption explicit and avoids a silent hard ClassCastException if the invariant is ever violated.

Also applies to: 4028-4033

server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationPrimaryPromotionIT.java (2)

34-38: Static test coordination state should be reset for future extensibility

The static coordination state:

public static boolean lockEnable = false;
public static CountDownLatch indexLuceneLatch = new CountDownLatch(1);
public static CountDownLatch flushLatch = new CountDownLatch(1);
public static CountDownLatch refreshLatch = new CountDownLatch(1);

combined with the mocked engines is fine for a single test, but it will behave oddly if additional tests are ever added to this class (latches already at zero, lockEnable left true, etc.).

Consider resetting all of these in a @Before (or @After) method, e.g. reassigning new CountDownLatch(1) instances and lockEnable = false, so each test starts from a known clean state.

Also applies to: 53-100


39-42: Ensure the @Before setup method actually runs

setup() is declared private:

@Before
private void setup() {
    internalCluster().startClusterManagerOnlyNode();
}

Depending on the JUnit/OpenSearch test harness configuration, private @Before methods may not be discovered. If that happens, this setup will silently be skipped and the test will rely on default cluster behaviour.

To be safe and consistent with typical JUnit usage, make this method public (and optionally rename to setUp):

-    @Before
-    private void setup() {
+    @Before
+    public void setup() {
         internalCluster().startClusterManagerOnlyNode();
     }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2b6d266 and aae28f2.

📒 Files selected for processing (3)
  • server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationPrimaryPromotionIT.java (1 hunks)
  • server/src/main/java/org/opensearch/index/shard/IndexShard.java (2 hunks)
  • server/src/main/java/org/opensearch/indices/replication/common/CopyState.java (3 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
server/src/main/java/org/opensearch/index/shard/IndexShard.java (1)
server/src/main/java/org/opensearch/index/engine/InternalEngine.java (1)
  • InternalEngine (143-3057)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (20)
  • GitHub Check: gradle-check
  • GitHub Check: assemble (25, windows-latest)
  • GitHub Check: assemble (25, ubuntu-latest)
  • GitHub Check: assemble (21, ubuntu-24.04-arm)
  • GitHub Check: assemble (25, ubuntu-24.04-arm)
  • GitHub Check: assemble (21, windows-latest)
  • GitHub Check: assemble (21, ubuntu-latest)
  • GitHub Check: precommit (25, macos-15-intel)
  • GitHub Check: precommit (21, windows-2025, true)
  • GitHub Check: precommit (25, ubuntu-latest)
  • GitHub Check: precommit (25, macos-15)
  • GitHub Check: precommit (25, windows-latest)
  • GitHub Check: precommit (21, macos-15-intel)
  • GitHub Check: precommit (25, ubuntu-24.04-arm)
  • GitHub Check: precommit (21, windows-latest)
  • GitHub Check: precommit (21, ubuntu-24.04-arm)
  • GitHub Check: precommit (21, ubuntu-latest)
  • GitHub Check: detect-breaking-change
  • GitHub Check: Analyze (java)
  • GitHub Check: Mend Security Check

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (2)
server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationPrimaryPromotionIT.java (2)

109-163: Ensure test actually runs and cannot hang indefinitely

A few issues in the main test method impact its effectiveness and robustness:

  1. Missing @Test annotation

The method testPrimaryStopped_ReplicaPromoted_no_data_loss is not annotated with @Test. Under JUnit 4 (as used by OpenSearchIntegTestCase), this will typically not be discovered as a test method. Add the annotation and import:

-import org.junit.Before;
+import org.junit.Before;
+import org.junit.Test;
...
-    // Used to test that primary promotion does not result in data loss.
-    public void testPrimaryStopped_ReplicaPromoted_no_data_loss() throws Exception {
+    // Used to test that primary promotion does not result in data loss.
+    @Test
+    public void testPrimaryStopped_ReplicaPromoted_no_data_loss() throws Exception {

This is critical because this IT is the primary guard against the data‑loss scenario.

  1. Unbounded waits on latches and join() can hang the suite (previously flagged)

flushLatch.await(), refreshLatch.await(), and writeThread.join() are all unbounded. If the mocked engine or replication flow regresses, the test will hang indefinitely and stall the IT suite.

You can mitigate this with bounded waits and assertions:

-        flushLatch.await();
+        assertTrue("flushLatch timed out", flushLatch.await(30, TimeUnit.SECONDS));
...
-        refreshLatch.await();
-        writeThread.join();
+        assertTrue("refreshLatch timed out", refreshLatch.await(30, TimeUnit.SECONDS));
+        writeThread.join(TimeUnit.SECONDS.toMillis(30));
+        assertFalse("writeThread is still alive", writeThread.isAlive());

(Requires import java.util.concurrent.TimeUnit; and using assertTrue/assertFalse from the base test class.)

  1. Mocked publish‑checkpoint exception is permanent; clarify intent (previously flagged in spirit)

The replica’s PublishCheckpointAction handler always throws a RemoteTransportException:

replicaTransportService.addRequestHandlingBehavior(
    PublishCheckpointAction.ACTION_NAME + TransportReplicationAction.REPLICA_ACTION_SUFFIX,
    (handler, request, channel, task) -> {
        throw new RemoteTransportException("mock remote transport exception", new OpenSearchRejectedExecutionException());
    }
);

If the intention is to simulate a permanent failure on publishing checkpoints, adding a short comment here would make that explicit. If the goal is to fail only the first attempt and allow retries to succeed, consider guarding this with a flag (e.g., an AtomicBoolean fail‑once pattern) so later attempts go through.


160-161: Use assertEquals for clearer hit‑count assertion (already suggested in earlier review)

The assertion:

assertTrue(response.getHits().getTotalHits().value() == 2);

works but produces a less helpful failure message. Prefer assertEquals with the correct long literal:

-        assertTrue(response.getHits().getTotalHits().value() == 2);
+        assertEquals(2L, response.getHits().getTotalHits().value());

This will show expected vs. actual values directly on failure.

🧹 Nitpick comments (1)
server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationPrimaryPromotionIT.java (1)

41-44: Static latch and flag state should be reset for test isolation

lockEnable and the CountDownLatch instances are public static and never reset. This is fine for a single test today, but will make the class fragile if additional tests are added or if the class is reused, since stale latch state (count already at 0, lockEnable left true) will change the control‑flow assumptions.

Consider reinitializing these in a @Before/@After method (or localizing them to the test) and, if they remain shared across threads, making lockEnable volatile to guarantee visibility.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between aae28f2 and 88ba97c.

📒 Files selected for processing (1)
  • server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationPrimaryPromotionIT.java (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (20)
  • GitHub Check: gradle-check
  • GitHub Check: Analyze (java)
  • GitHub Check: precommit (21, windows-2025, true)
  • GitHub Check: precommit (25, macos-15-intel)
  • GitHub Check: precommit (25, windows-latest)
  • GitHub Check: precommit (21, ubuntu-24.04-arm)
  • GitHub Check: precommit (25, ubuntu-latest)
  • GitHub Check: precommit (21, windows-latest)
  • GitHub Check: precommit (25, ubuntu-24.04-arm)
  • GitHub Check: precommit (21, macos-15)
  • GitHub Check: precommit (21, macos-15-intel)
  • GitHub Check: precommit (25, macos-15)
  • GitHub Check: precommit (21, ubuntu-latest)
  • GitHub Check: assemble (25, ubuntu-24.04-arm)
  • GitHub Check: assemble (21, ubuntu-24.04-arm)
  • GitHub Check: assemble (21, windows-latest)
  • GitHub Check: detect-breaking-change
  • GitHub Check: assemble (25, windows-latest)
  • GitHub Check: assemble (21, ubuntu-latest)
  • GitHub Check: assemble (25, ubuntu-latest)

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>
Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (4)
server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationPrimaryPromotionIT.java (4)

47-50: JUnit lifecycle method should not be private.

With JUnit 4, methods annotated with @Before must be public; declaring private void setup() can cause the runner to treat it as an invalid lifecycle method and fail or ignore it. That would prevent the cluster-manager-only node from being started as intended.

Change the signature to be public:

     @Before
-    private void setup() {
+    public void setup() {
         internalCluster().startClusterManagerOnlyNode();
     }

80-92: Guard engine-level latch wait with a timeout and handle interruption explicitly.

MockInternalEngine.generateSeqNoForOperationOnPrimary blocks on indexLuceneLatch.await() without a timeout and wraps all Exceptions in a RuntimeException. If the latch is never released (e.g., test regression, early failure), this will block the engine thread indefinitely and hang the test run.

Add a bounded wait with an assertion:

                 if (lockEnable) {
                     flushLatch.countDown();
-                    indexLuceneLatch.await();
+                    if (!indexLuceneLatch.await(30, TimeUnit.SECONDS)) {
+                        throw new AssertionError("indexLuceneLatch timed out");
+                    }
                 }
-            } catch (Exception e) {
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
                 throw new RuntimeException(e);
             }

120-144: Add timeout to thread join and capture exceptions from the write thread.

Two issues with the write thread:

  1. Line 144: Unbounded writeThread.join() can hang the test
    If the write thread encounters an unexpected failure or deadlock, this will block indefinitely. Add a timeout:

    -        writeThread.join();
    +        writeThread.join(TimeUnit.SECONDS.toMillis(30));
    +        assertFalse("writeThread still alive after timeout", writeThread.isAlive());
  2. Line 120: Exceptions thrown inside the lambda are silently lost
    If client().prepareIndex() throws an exception (e.g., rejection, timeout), the test cannot detect it and may incorrectly pass. Capture and propagate exceptions:

    +        AtomicReference<Exception> writeError = new AtomicReference<>();
            Thread writeThread = new Thread(() -> {
    -            client().prepareIndex(INDEX_NAME).setId("2").setSource("foo2", "bar2").get();
    +            try {
    +                client().prepareIndex(INDEX_NAME).setId("2").setSource("foo2", "bar2").get();
    +            } catch (Exception e) {
    +                writeError.set(e);
    +            }
            });
            writeThread.start();
            ...
            writeThread.join(TimeUnit.SECONDS.toMillis(30));
    +        assertNull("Write thread failed: " + writeError.get(), writeError.get());

133-138: Clarify whether checkpoint publish should fail permanently or transiently.

The mock transport exception is thrown unconditionally for all PublishCheckpointAction requests throughout the entire test. If the intent is to simulate a transient network failure where retries eventually succeed, you should use a flag or counter to allow subsequent requests to pass.

If permanent failure is the intended scenario to validate data loss prevention, add a comment clarifying that all checkpoint publishes are expected to fail:

+        // Simulate permanent network partition: all checkpoint publishes to replica will fail
         replicaTransportService.addRequestHandlingBehavior(
             PublishCheckpointAction.ACTION_NAME + TransportReplicationAction.REPLICA_ACTION_SUFFIX,
             (handler, request, channel, task) -> {
                 throw new RemoteTransportException("mock remote transport exception", new OpenSearchRejectedExecutionException());
             }
         );

Otherwise, if retries should succeed after the first failure:

+        AtomicBoolean failOnce = new AtomicBoolean(true);
         replicaTransportService.addRequestHandlingBehavior(
             PublishCheckpointAction.ACTION_NAME + TransportReplicationAction.REPLICA_ACTION_SUFFIX,
             (handler, request, channel, task) -> {
+                if (failOnce.compareAndSet(true, false)) {
                     throw new RemoteTransportException("mock remote transport exception", new OpenSearchRejectedExecutionException());
+                }
+                handler.messageReceived(request, channel, task);
             }
         );
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 88ba97c and 1aa8d32.

📒 Files selected for processing (1)
  • server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationPrimaryPromotionIT.java (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (20)
  • GitHub Check: gradle-check
  • GitHub Check: precommit (21, windows-latest)
  • GitHub Check: assemble (25, ubuntu-24.04-arm)
  • GitHub Check: precommit (21, ubuntu-24.04-arm)
  • GitHub Check: detect-breaking-change
  • GitHub Check: assemble (21, ubuntu-latest)
  • GitHub Check: assemble (25, windows-latest)
  • GitHub Check: precommit (25, ubuntu-latest)
  • GitHub Check: precommit (21, windows-2025, true)
  • GitHub Check: precommit (21, macos-15-intel)
  • GitHub Check: precommit (25, macos-15-intel)
  • GitHub Check: precommit (25, ubuntu-24.04-arm)
  • GitHub Check: assemble (21, ubuntu-24.04-arm)
  • GitHub Check: Analyze (java)
  • GitHub Check: assemble (21, windows-latest)
  • GitHub Check: precommit (21, ubuntu-latest)
  • GitHub Check: precommit (21, macos-15)
  • GitHub Check: precommit (25, macos-15)
  • GitHub Check: precommit (25, windows-latest)
  • GitHub Check: assemble (25, ubuntu-latest)
🔇 Additional comments (1)
server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationPrimaryPromotionIT.java (1)

110-164: Test logic correctly validates data loss prevention during primary promotion.

The test effectively simulates the scenario:

  1. Writes are in-flight when a flush/refresh occurs
  2. Network partition prevents checkpoint propagation to replica
  3. Primary is stopped and replica is promoted
  4. The promoted replica should contain all committed data

Good improvements from previous review:

  • Lines 122, 143: Latch waits now have 30-second timeouts
  • Line 162: Uses assertEquals for clearer assertion messages

The test logic correctly validates that the promoted replica recovers both documents despite the simulated network partition.

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>
@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Dec 4, 2025

❌ Gradle check result for f4a712b: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>
@guojialiang92 guojialiang92 force-pushed the dev/fix-data-loss-in-vanilla-segment-replication branch from f4a712b to 9231a7f Compare December 5, 2025 00:43
@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Dec 5, 2025

❌ Gradle check result for 939575a: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@guojialiang92 guojialiang92 force-pushed the dev/fix-data-loss-in-vanilla-segment-replication branch from 939575a to f0584ca Compare December 5, 2025 07:39
@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Dec 5, 2025

❌ Gradle check result for f0584ca: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>
@guojialiang92 guojialiang92 force-pushed the dev/fix-data-loss-in-vanilla-segment-replication branch from f0584ca to 4265d71 Compare December 5, 2025 09:17
@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Dec 5, 2025

✅ Gradle check result for 4265d71: SUCCESS

@linuxpi
Copy link
Copy Markdown
Contributor

linuxpi commented Dec 9, 2025

thanks for raising the PR @guojialiang92

maxSeqNo < lastRefreshedCheckpoint. The old primary shard has not yet performed a flush.

i could not understand in which the above condition would be true?

overall i am good with the change

@guojialiang92
Copy link
Copy Markdown
Contributor Author

guojialiang92 commented Dec 10, 2025

Thank you for your reply @linuxpi. :)

maxSeqNo < lastRefreshedCheckpoint. The old primary shard has not yet performed a flush.

i could not understand in which the above condition would be true?

Need to explain first. The max_seq_no here is retrieved from the userData of segmentInfosSnapshot during the initialization of CopyState, and it was recorded in the userData during the last commitIndexWriter. In other words, it is not the latest value recorded in InternalEngine#localCheckpointTracker.

After performing the commitIndexWriter, a refresh operation was then invoked, which would result in maxSeqNo < lastRefreshedCheckpoint.

To make it easier to understand, I will also add explanations to the above description.

@linuxpi
Copy link
Copy Markdown
Contributor

linuxpi commented Dec 10, 2025

Thanks for the explanation @guojialiang92 . Its clear now.

@linuxpi linuxpi merged commit d520f8d into opensearch-project:main Dec 10, 2025
37 checks passed
fdesu pushed a commit to fdesu/OpenSearch that referenced this pull request Dec 13, 2025
…pensearch-project#20150)

* Avoid data loss in vanilla segment replication.

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* update

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* update

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* update

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* update

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* update

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* If local segment replication is enabled, a force flush needs to be performed after translog recovery.

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* Add comments

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* put Math.min(maxSeqNo, lastRefreshedCheckpoint) in max_seq_no

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* update

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* Add comments

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* Add comments

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

---------

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>
liuguoqingfz pushed a commit to liuguoqingfz/OpenSearch that referenced this pull request Dec 15, 2025
…pensearch-project#20150)

* Avoid data loss in vanilla segment replication.

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* update

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* update

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* update

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* update

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* update

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* If local segment replication is enabled, a force flush needs to be performed after translog recovery.

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* Add comments

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* put Math.min(maxSeqNo, lastRefreshedCheckpoint) in max_seq_no

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* update

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* Add comments

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* Add comments

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

---------

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>
kaushalmahi12 pushed a commit to kaushalmahi12/OpenSearch that referenced this pull request Jan 8, 2026
…pensearch-project#20150)

* Avoid data loss in vanilla segment replication.

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* update

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* update

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* update

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* update

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* update

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* If local segment replication is enabled, a force flush needs to be performed after translog recovery.

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* Add comments

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* put Math.min(maxSeqNo, lastRefreshedCheckpoint) in max_seq_no

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* update

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* Add comments

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* Add comments

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

---------

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants