Skip to content

Introduce concurrent translog recovery to accelerate segment replication primary promotion#20251

Merged
guojialiang92 merged 19 commits intoopensearch-project:mainfrom
guojialiang92:dev/support_concurrent_translog_recovery
Jan 27, 2026
Merged

Introduce concurrent translog recovery to accelerate segment replication primary promotion#20251
guojialiang92 merged 19 commits intoopensearch-project:mainfrom
guojialiang92:dev/support_concurrent_translog_recovery

Conversation

@guojialiang92
Copy link
Copy Markdown
Contributor

@guojialiang92 guojialiang92 commented Dec 16, 2025

Description

The purpose of this PR is to introduce the translog concurrent recovery mechanism mentioned in #20131, which is used to accelerate the primary promotion of segment replication.

Default concurrency strategy

  • Translog concurrent recovery is restricted to use in the primary promotion scenario of segment replication.
  • Introduce a dedicated thread pool translog_recovery with a size equal to the number of cores and an unbounded queue.
  • Introduce cluster-level dynamic configuration indices.translog_concurrent_recovery.enable, which is used to enable translog concurrent recovery, with a default value of false.
  • Introduce cluster-level dynamic configuration indices.translog_concurrent_recovery.batch_size, which is used to represent the number of translog operations processed by a single thread, with a default value of 500,000.
  • When the number of translog recovery operations is less than indices.translog_concurrent_recovery.batch_size, concurrency is not enabled, remaining consistent with the current execution logic.
  • When the number of translog recovery operations exceeds indices.translog_concurrent_recovery.batch_size.
    • If translog_recovery is not busy, enable concurrency, with each thread responsible for executing the number of operations specified by indices.translog_concurrent_recovery.batch_size.
    • If translog_recovery is busy, do not enable concurrency, keeping it consistent with the current execution logic.

Related Issues

Resolves #[20131]

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Summary by CodeRabbit

  • New Features

    • Added dynamic cluster settings to enable concurrent translog recovery and to configure batch size.
    • Introduced a dedicated translog-recovery thread pool and batched parallel translog recovery to speed shard recovery when conditions allow, with automatic fallback to the sequential path.
  • Tests

    • Added end-to-end tests validating concurrent translog recovery, primary promotion, replication, and translog/Lucene state consistency.

✏️ Tip: You can customize this high-level summary in your review settings.

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Dec 16, 2025

Important

Review skipped

Auto incremental reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

  • 🔍 Trigger a full review
📝 Walkthrough

Walkthrough

Adds dynamic settings to enable batched concurrent translog recovery, registers them as cluster settings, introduces a dedicated TRANSLOG_RECOVERY thread pool, implements conditional parallel per-batch translog recovery in IndexShard (with sequential fallback), and updates tests and test-framework to exercise the new path.

Changes

Cohort / File(s) Summary
Settings & Cluster registration
server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java, server/src/main/java/org/opensearch/common/settings/ClusterSettings.java
Add INDICES_TRANSLOG_CONCURRENT_RECOVERY_ENABLE (boolean) and INDICES_TRANSLOG_CONCURRENT_RECOVERY_BATCH_SIZE (long): fields, accessors, mutators, update consumers, and include them in built-in cluster settings.
Thread pool
server/src/main/java/org/opensearch/threadpool/ThreadPool.java
Add Names.TRANSLOG_RECOVERY, map it to ThreadPoolType.FIXED, and register a FixedExecutorBuilder for TRANSLOG_RECOVERY (size = allocated processors).
Parallel recovery implementation
server/src/main/java/org/opensearch/index/shard/IndexShard.java
Implement conditional parallel translog recovery: check settings and pool availability, split translog into batches, create per-batch snapshots, submit per-batch recovery tasks to TRANSLOG_RECOVERY collecting Future<Integer>, aggregate counts/exceptions, ensure snapshot cleanup; fall back to sequential recovery when not applicable.
Test framework
test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java
Add addReplica(Path remotePath, RecoverySettings recoverySettings) overload and use it in ReplicationGroup to create replicas with specific RecoverySettings and merged segment publisher.
Integration test
server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java
Add testPrimaryPromotionWithConcurrentTranslogRecovery() and a shared helper doPrimaryPromotion(...) to exercise primary/replica promotion with concurrent translog recovery and merged segment publisher.

Sequence Diagram

sequenceDiagram
    participant Initiator as Recovery initiator
    participant IndexShard
    participant RecoverySettings
    participant TranslogMgr as TranslogManager
    participant ThreadPool as TRANSLOG_RECOVERY
    participant Engine

    Initiator->>IndexShard: start translog recovery
    IndexShard->>RecoverySettings: check concurrent enabled & batchSize
    alt concurrent enabled & pool available & ops > batchSize
        IndexShard->>TranslogMgr: obtain full translog snapshot
        IndexShard->>IndexShard: compute batch boundaries
        loop per batch
            IndexShard->>ThreadPool: submit batch recovery task (snapshot slice)
            ThreadPool->>Engine: apply batch operations
            Engine-->>ThreadPool: result / exception
            ThreadPool-->>IndexShard: Future completes
            IndexShard->>TranslogMgr: close per-batch snapshot
        end
        alt any failure
            IndexShard->>Initiator: throw aggregated IOException
        else
            IndexShard->>Initiator: return total recovered ops
        end
    else fallback
        IndexShard->>TranslogMgr: single-snapshot sequential recovery
        TranslogMgr->>Engine: apply ops serially
        Engine-->>IndexShard: recovery complete
        IndexShard->>Initiator: return recovered ops
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

  • Review concurrency, Future aggregation, and exception consolidation in IndexShard.java.
  • Verify per-batch translog snapshot lifecycle and finally-block cleanup.
  • Confirm correct thread pool registration, sizing, and ThreadPoolType mapping in ThreadPool.java.
  • Validate dynamic settings registration, update consumers, and inclusion in ClusterSettings.java.
  • Check new tests and test-framework overload for correctness and flakiness.

Suggested labels

enhancement, Indexing:Replication

Suggested reviewers

  • msfroh
  • dbwiddis
  • cwperks
  • mch2
  • reta
  • sachinpkale
  • anasalkouz
  • gbbafna
  • shwetathareja
  • kotwanikunal
  • saratvemulapalli
  • jed326

Poem

🐰 I split the logs in batches bright,
Threads hopped in parallel light,
Snapshots closed with tidy care,
Counts summed up, exceptions snared,
A rabbit cheers — recovery's right!

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 17.65% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and concisely describes the main change: introducing concurrent translog recovery to accelerate segment replication primary promotion, which aligns with the core purpose of the PR.
Description check ✅ Passed The description includes all required sections: Description (explaining the purpose and default concurrency strategy in detail), Related Issues (resolving #20131), and completed checklist items indicating testing, API changes, and documentation were addressed.

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java (1)

718-802: Well-structured test for concurrent translog recovery.

The test adequately exercises the concurrent translog recovery path with 200-300 documents and a batch size of 10, creating 20-30 concurrent batches. The assertions comprehensively verify engine types, document counts, translog statistics, and history consistency.

One observation: the batch size of 10 is significantly smaller than the production default of 50,000. While this is intentional for testing, consider adding a brief comment explaining this choice to help future maintainers understand the test design.

+    // Use a small batch size to ensure multiple concurrent batches are created during recovery
     @TestLogging(reason = "Getting trace logs from IndexShard", value = "org.opensearch.index.shard.IndexShard:TRACE")
     public void testPrimaryPromotionWithConcurrentTranslogRecovery() throws Exception {
         final RecoverySettings recoverySettings = new RecoverySettings(
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e798353 and ac424e3.

📒 Files selected for processing (6)
  • server/src/main/java/org/opensearch/common/settings/ClusterSettings.java (1 hunks)
  • server/src/main/java/org/opensearch/index/shard/IndexShard.java (4 hunks)
  • server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java (5 hunks)
  • server/src/main/java/org/opensearch/threadpool/ThreadPool.java (3 hunks)
  • server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java (2 hunks)
  • test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java (2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (20)
  • GitHub Check: gradle-check
  • GitHub Check: assemble (21, windows-latest)
  • GitHub Check: assemble (21, ubuntu-latest)
  • GitHub Check: assemble (25, ubuntu-latest)
  • GitHub Check: assemble (25, ubuntu-24.04-arm)
  • GitHub Check: assemble (21, ubuntu-24.04-arm)
  • GitHub Check: assemble (25, windows-latest)
  • GitHub Check: detect-breaking-change
  • GitHub Check: precommit (21, windows-2025, true)
  • GitHub Check: Analyze (java)
  • GitHub Check: precommit (25, macos-15-intel)
  • GitHub Check: precommit (21, macos-15)
  • GitHub Check: precommit (21, ubuntu-24.04-arm)
  • GitHub Check: precommit (21, ubuntu-latest)
  • GitHub Check: precommit (25, macos-15)
  • GitHub Check: precommit (25, windows-latest)
  • GitHub Check: precommit (25, ubuntu-latest)
  • GitHub Check: precommit (21, windows-latest)
  • GitHub Check: precommit (21, macos-15-intel)
  • GitHub Check: precommit (25, ubuntu-24.04-arm)
🔇 Additional comments (11)
test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java (2)

314-316: LGTM - Constructor correctly uses new overload with RecoverySettings.

The constructor now properly passes recoverySettings to replicas, enabling concurrent translog recovery testing scenarios.


472-487: LGTM - New overload properly supports RecoverySettings injection.

The new addReplica(Path, RecoverySettings) overload correctly constructs an IndexShard with the provided RecoverySettings, enabling tests to exercise concurrent translog recovery paths.

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java (1)

349-350: LGTM - New cluster settings properly registered.

The concurrent translog recovery settings are correctly added to BUILT_IN_CLUSTER_SETTINGS, enabling dynamic configuration updates.

server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java (1)

38-40: LGTM - Necessary imports added for test infrastructure.

server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java (4)

269-270: LGTM - Thread-safe volatile fields for concurrent access.


306-307: LGTM - Settings properly initialized from constructor.


347-351: LGTM - Dynamic setting update consumers properly registered.


521-535: LGTM - Standard accessor pattern with private setters.

The public getters and private setters follow the established pattern for cluster settings in this class.

server/src/main/java/org/opensearch/threadpool/ThreadPool.java (3)

102-102: LGTM - New thread pool name constant added.


183-183: LGTM - Thread pool type correctly mapped to FIXED.

A FIXED pool type is appropriate for translog recovery to provide bounded parallelism.


263-263: Verify unbounded queue is intentional for TRANSLOG_RECOVERY pool.

The queue size of -1 creates an unbounded queue. During large translog recoveries, this could lead to memory pressure if many recovery tasks are queued. Consider whether a bounded queue with a reasonable limit would be more appropriate, similar to other fixed pools like WRITE (queue size 10000) or TRANSLOG_SYNC (queue size 10000).

If unbounded is intentional (e.g., because concurrent recovery is already gated by settings), please document this design choice.

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for ac424e3: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
server/src/main/java/org/opensearch/index/shard/IndexShard.java (1)

5281-5336: Concurrent translog recovery runner looks solid; consider loosening InternalTranslogManager coupling and documenting assumptions

The new TranslogRecoveryRunner implementation in resetEngineToGlobalCheckpoint() generally looks correct and addresses earlier issues:

  • Snapshots created for each batch are now always closed in finally via IOUtils.closeWhileHandlingException, so there is no longer a leak if any task fails.
  • Batch loop bounds use a standard ceil‑division pattern, avoiding the extra empty batch when totalOperations is an exact multiple of batchSize.
  • Exceptions from individual futures are aggregated into a single IOException, preserving all suppressed causes while still returning the total recovered op count on success.
  • Concurrency is conservatively gated on:
    • recoverySettings.isTranslogConcurrentRecoveryEnable(),
    • indexSettings.isSegRepEnabledOrRemoteNode(),
    • totalOperations > batchSize, and
    • the TRANSLOG_RECOVERY executor queue being empty,
      so you fall back to the well‑tested sequential path when conditions aren’t favorable.

A few follow‑ups worth considering:

  1. Avoid hard dependency on InternalTranslogManager where possible.
    The assert translogManager instanceof InternalTranslogManager; and the downcast to call getTranslog().newSnapshot(start, end) tie this path to a specific implementation. If you expect other TranslogManager implementations (plugins, future remote variants), it might be cleaner to:

    • either expose a newSnapshot(long fromSeqNo, long toSeqNo) style API on TranslogManager, or
    • guard the concurrent path with an instanceof and fall back to sequential recovery when the manager is not InternalTranslogManager.
      Right now the assert only fires with assertions enabled; in production we’d get a ClassCastException instead.
  2. Clarify assumptions around partitioning by seq_no.
    The batching logic derives [start, end] ranges from engine.getProcessedLocalCheckpoint() and batchSize, and uses snapshot.totalOperations() purely to compute the number of batches. This relies on translog invariants (strictly increasing seq_nos, no concurrent appends because operations are blocked during resetEngineToGlobalCheckpoint) so that the union of your per‑batch snapshots matches the original recovery range. A short comment here explaining that assumption would help future maintainers understand why it’s safe to ignore the passed snapshot other than for totalOperations().

  3. TRANSLOG_RECOVERY executor assumption.
    The cast to OpenSearchThreadPoolExecutor and queue emptiness check are fine with today’s FixedExecutorBuilder setup for ThreadPool.Names.TRANSLOG_RECOVERY, but they implicitly assume that name will always map to an OpenSearchThreadPoolExecutor. If that ever changes, a runtime ClassCastException will be thrown. A brief comment noting the expectation (or an instanceof guard that falls back to sequential recovery if the cast fails) would make this more robust.

If you’re comfortable with the current constraints (internal manager only and fixed executor type), this can merge as‑is; otherwise the above refactors are low‑risk and can be done now or in a follow‑up.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ac424e3 and ad8564b.

📒 Files selected for processing (2)
  • server/src/main/java/org/opensearch/index/shard/IndexShard.java (4 hunks)
  • server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java (5 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
server/src/main/java/org/opensearch/index/shard/IndexShard.java (3)
server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java (1)
  • InternalTranslogManager (38-480)
server/src/main/java/org/opensearch/threadpool/ThreadPool.java (1)
  • Names (99-130)
libs/common/src/main/java/org/opensearch/common/util/io/IOUtils.java (1)
  • IOUtils (58-317)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (19)
  • GitHub Check: gradle-check
  • GitHub Check: assemble (25, ubuntu-24.04-arm)
  • GitHub Check: assemble (21, ubuntu-latest)
  • GitHub Check: assemble (21, windows-latest)
  • GitHub Check: assemble (21, ubuntu-24.04-arm)
  • GitHub Check: precommit (21, ubuntu-24.04-arm)
  • GitHub Check: assemble (25, ubuntu-latest)
  • GitHub Check: precommit (21, macos-15-intel)
  • GitHub Check: precommit (25, ubuntu-latest)
  • GitHub Check: assemble (25, windows-latest)
  • GitHub Check: precommit (21, windows-2025, true)
  • GitHub Check: precommit (21, macos-15)
  • GitHub Check: precommit (25, ubuntu-24.04-arm)
  • GitHub Check: precommit (21, windows-latest)
  • GitHub Check: precommit (25, macos-15-intel)
  • GitHub Check: precommit (21, ubuntu-latest)
  • GitHub Check: precommit (25, windows-latest)
  • GitHub Check: Analyze (java)
  • GitHub Check: detect-breaking-change
🔇 Additional comments (2)
server/src/main/java/org/opensearch/index/shard/IndexShard.java (1)

106-107: New imports align with concurrent translog recovery usage

The added imports (OpenSearchThreadPoolExecutor, InternalTranslogManager, TranslogManager, Future) are consistent with the new concurrent recovery runner and TRANSLOG_RECOVERY thread pool usage; no issues here.

Also applies to: 187-195, 246-247

server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java (1)

233-246: Concurrent translog recovery settings wiring is correct; clarify intended default batch size

The settings, backing fields, constructor initialization, and dynamic update consumers are all consistent and thread‑safe; min 1 on INDICES_TRANSLOG_CONCURRENT_RECOVERY_BATCH_SIZE prevents division-by-zero.

The same batch_size setting serves dual purposes in IndexShard:

  • Threshold for switching from single‑threaded to concurrent recovery (totalOperations > batchSize, line 5290)
  • Per-thread batch size, calculated as (totalOperations + batchSize - 1) / batchSize (line 5296)

This dual-use is correct but worth documenting: tuning the batch size affects both when concurrency activates and how work is divided per thread. If separate tuning of "concurrency threshold" vs "operations per thread" is needed later, consider splitting into two settings.

Please confirm the intended default of 50,000 operations for this batch size.

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>
Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
server/src/main/java/org/opensearch/index/shard/IndexShard.java (1)

5279-5332: Make concurrent translog recovery robust to snapshot/submit failures and tighten batch logic

The concurrent TranslogRecoveryRunner is close, but there are two important issues:

  1. Snapshot leak on failures in the creation loop

    If either:

    • engine.translogManager().newChangesSnapshot(start, end, false) throws, or
    • threadPool.executor(...).submit(...) throws (e.g., RejectedExecutionException during shutdown),

    the method exits before reaching the second loop that closes snapshots. Any snapshots created in prior iterations of the loop are never closed, leaking file descriptors until shard/engine shutdown.

    To fix this, wrap the batch creation + execution phase in a try/finally that always closes every Translog.Snapshot you created, and explicitly close the snapshot if submit() fails before it’s added to the list.

  2. Cleaner last-batch detection

    The last-batch check i == totalOperations / batchSize relies on the same arithmetic as the loop bound but is harder to read and easy to get wrong during future changes. Precomputing the batch count and checking against batches - 1 is clearer and matches the ceil formula used in the loop bound.

A minimally invasive refactor that addresses both points:

-        final TranslogRecoveryRunner translogRunner = (snapshot) -> {
-            Engine engine = newEngineReference.get();
-            assert null != engine;
-            int totalOperations = snapshot.totalOperations();
-            long batchSize = recoverySettings.getTranslogConcurrentRecoveryBatchSize();
-            boolean threadPoolNotBusy = ((OpenSearchThreadPoolExecutor) threadPool.executor(ThreadPool.Names.TRANSLOG_RECOVERY)).getQueue()
-                .isEmpty();
-            if (recoverySettings.isTranslogConcurrentRecoveryEnable()
-                && indexSettings.isSegRepEnabledOrRemoteNode()
-                && totalOperations > batchSize
-                && threadPoolNotBusy) {
-                long localCheckpoint = engine.getProcessedLocalCheckpoint();
-                List<Tuple<Future<Integer>, Translog.Snapshot>> translogSnapshotsFutureList = new ArrayList<>();
-                for (int i = 0; i < (totalOperations + batchSize - 1) / batchSize; i++) {
-                    long start = localCheckpoint + 1 + (long) i * batchSize;
-                    long end = (i == totalOperations / batchSize) ? Long.MAX_VALUE : start + batchSize - 1;
-                    Translog.Snapshot translogSnapshot = engine.translogManager().newChangesSnapshot(start, end, false);
-                    translogSnapshotsFutureList.add(
-                        new Tuple<>(
-                            threadPool.executor(ThreadPool.Names.TRANSLOG_RECOVERY)
-                                .submit(() -> runTranslogRecovery(engine, translogSnapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {
-                                    // TODO: add a dedicate recovery stats for the reset translog
-                                })),
-                            translogSnapshot
-                        )
-                    );
-                }
-                Exception exception = null;
-                int totalRecovered = 0;
-                for (Tuple<Future<Integer>, Translog.Snapshot> translogSnapshotFuture : translogSnapshotsFutureList) {
-                    try {
-                        int recoveredOps = translogSnapshotFuture.v1().get();
-                        totalRecovered += recoveredOps;
-                    } catch (Exception e) {
-                        if (exception == null) {
-                            exception = e;
-                        } else {
-                            exception.addSuppressed(e);
-                        }
-                    } finally {
-                        Translog.Snapshot translogSnapshot = translogSnapshotFuture.v2();
-                        IOUtils.closeWhileHandlingException(translogSnapshot);
-                    }
-                }
-                if (exception != null) {
-                    throw new IOException("generate exception when concurrent translog recovery", exception);
-                }
-                return totalRecovered;
-            } else {
-                return runTranslogRecovery(newEngineReference.get(), snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {
-                    // TODO: add a dedicate recovery stats for the reset translog
-                });
-            }
-        };
+        final TranslogRecoveryRunner translogRunner = (snapshot) -> {
+            Engine engine = newEngineReference.get();
+            assert engine != null;
+            final int totalOperations = snapshot.totalOperations();
+            final long batchSize = recoverySettings.getTranslogConcurrentRecoveryBatchSize();
+            final boolean threadPoolNotBusy = ((OpenSearchThreadPoolExecutor) threadPool.executor(ThreadPool.Names.TRANSLOG_RECOVERY))
+                .getQueue()
+                .isEmpty();
+
+            if (recoverySettings.isTranslogConcurrentRecoveryEnable()
+                && indexSettings.isSegRepEnabledOrRemoteNode()
+                && totalOperations > batchSize
+                && threadPoolNotBusy) {
+                final long localCheckpoint = engine.getProcessedLocalCheckpoint();
+                final long batches = (totalOperations + batchSize - 1) / batchSize;
+                final List<Tuple<Future<Integer>, Translog.Snapshot>> translogSnapshotsFutureList = new ArrayList<>();
+                Exception exception = null;
+                int totalRecovered = 0;
+
+                try {
+                    for (long i = 0; i < batches; i++) {
+                        final long start = localCheckpoint + 1 + i * batchSize;
+                        final long end = (i == batches - 1) ? Long.MAX_VALUE : start + batchSize - 1;
+                        final Translog.Snapshot translogSnapshot = engine.translogManager().newChangesSnapshot(start, end, false);
+                        try {
+                            Future<Integer> future = threadPool.executor(ThreadPool.Names.TRANSLOG_RECOVERY)
+                                .submit(
+                                    () -> runTranslogRecovery(
+                                        engine,
+                                        translogSnapshot,
+                                        Engine.Operation.Origin.LOCAL_RESET,
+                                        () -> {
+                                            // TODO: add a dedicate recovery stats for the reset translog
+                                        }
+                                    )
+                                );
+                            translogSnapshotsFutureList.add(new Tuple<>(future, translogSnapshot));
+                        } catch (RuntimeException e) {
+                            IOUtils.closeWhileHandlingException(translogSnapshot);
+                            throw e;
+                        }
+                    }
+
+                    for (Tuple<Future<Integer>, Translog.Snapshot> translogSnapshotFuture : translogSnapshotsFutureList) {
+                        try {
+                            int recoveredOps = translogSnapshotFuture.v1().get();
+                            totalRecovered += recoveredOps;
+                        } catch (Exception e) {
+                            if (exception == null) {
+                                exception = e;
+                            } else {
+                                exception.addSuppressed(e);
+                            }
+                        }
+                    }
+
+                    if (exception != null) {
+                        throw new IOException("exception during concurrent translog recovery", exception);
+                    }
+                    return totalRecovered;
+                } finally {
+                    for (Tuple<Future<Integer>, Translog.Snapshot> translogSnapshotFuture : translogSnapshotsFutureList) {
+                        IOUtils.closeWhileHandlingException(translogSnapshotFuture.v2());
+                    }
+                }
+            } else {
+                return runTranslogRecovery(newEngineReference.get(), snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> {
+                    // TODO: add a dedicate recovery stats for the reset translog
+                });
+            }
+        };

This keeps the overall behavior and conditions intact, but ensures snapshots are always closed and makes the batch structure easier to reason about.

🧹 Nitpick comments (2)
server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java (2)

777-781: Consider verifying that concurrent recovery was actually used.

While the test correctly configures concurrent translog recovery and verifies the end result, it doesn't explicitly verify that the concurrent code path was exercised. Consider adding assertions or logging to confirm that concurrent recovery actually occurred (e.g., checking thread pool activity or recovery metrics).


717-800: Consider extracting common logic to reduce duplication.

This test method shares significant logic with testNRTReplicaPromotedAsPrimary (lines 533-599). While the duplication is reasonable for testing different scenarios, consider extracting the common test flow into a helper method that accepts RecoverySettings and MergedSegmentPublisher as parameters. This would improve maintainability if the test logic needs to change in the future.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between eff2f31 and cd7419c.

📒 Files selected for processing (2)
  • server/src/main/java/org/opensearch/index/shard/IndexShard.java (3 hunks)
  • server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
server/src/main/java/org/opensearch/index/shard/IndexShard.java (2)
server/src/main/java/org/opensearch/threadpool/ThreadPool.java (1)
  • Names (99-130)
server/src/main/java/org/opensearch/index/engine/Engine.java (1)
  • Operation (1503-1603)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (20)
  • GitHub Check: gradle-check
  • GitHub Check: precommit (21, windows-2025, true)
  • GitHub Check: precommit (25, windows-latest)
  • GitHub Check: precommit (25, ubuntu-latest)
  • GitHub Check: precommit (21, macos-15)
  • GitHub Check: precommit (25, macos-15-intel)
  • GitHub Check: precommit (25, macos-15)
  • GitHub Check: precommit (25, ubuntu-24.04-arm)
  • GitHub Check: precommit (21, macos-15-intel)
  • GitHub Check: precommit (21, ubuntu-latest)
  • GitHub Check: precommit (21, windows-latest)
  • GitHub Check: precommit (21, ubuntu-24.04-arm)
  • GitHub Check: assemble (21, windows-latest)
  • GitHub Check: assemble (21, ubuntu-24.04-arm)
  • GitHub Check: assemble (25, ubuntu-latest)
  • GitHub Check: assemble (25, ubuntu-24.04-arm)
  • GitHub Check: assemble (21, ubuntu-latest)
  • GitHub Check: assemble (25, windows-latest)
  • GitHub Check: detect-breaking-change
  • GitHub Check: Analyze (java)
🔇 Additional comments (7)
server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java (6)

38-38: LGTM!

The import is correctly added to support the new test method's usage of MergedSegmentPublisher.EMPTY.


718-724: LGTM!

The RecoverySettings configuration correctly enables concurrent translog recovery with a small batch size (10) to ensure the concurrent code path is exercised when larger document counts are indexed.


725-738: LGTM!

The replication group setup correctly passes the custom RecoverySettings and MergedSegmentPublisher.EMPTY to enable concurrent translog recovery testing.


740-752: LGTM!

The initial document indexing, replication, and assertions correctly verify the starting state before testing concurrent translog recovery.


773-775: LGTM!

The primary promotion logic correctly syncs the global checkpoint before promoting the replica.


783-798: LGTM!

The final assertions comprehensively verify engine types, document counts, translog state, and consistency across all shards after concurrent translog recovery.

server/src/main/java/org/opensearch/index/shard/IndexShard.java (1)

105-107: Imports for concurrent translog recovery look correct

OpenSearchThreadPoolExecutor and Future are appropriately imported and used in the new concurrent translog recovery logic; no issues here.

Also applies to: 243-245

@guojialiang92 guojialiang92 force-pushed the dev/support_concurrent_translog_recovery branch from 8f37578 to cc6738c Compare January 26, 2026 06:55
@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for cc6738c: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>
@guojialiang92 guojialiang92 force-pushed the dev/support_concurrent_translog_recovery branch from cc6738c to 905ccb3 Compare January 26, 2026 07:39
@github-actions
Copy link
Copy Markdown
Contributor

❕ Gradle check result for 905ccb3: UNSTABLE

Please review all flaky tests that succeeded after retry and create an issue if one does not already exist to track the flaky failure.

@guojialiang92
Copy link
Copy Markdown
Contributor Author

Thank you for your rigorous review @atris.

Based on your review suggestions, I have made the following optimizations.

  1. I adjusted IndexShard#translogConcurrentRecoverySemaphore to be static to ensure that the queue size is limited to 1000, and if the semaphore is exhausted, it can fall back to serial processing.
  2. The maximum value of RecoverySettings#INDICES_TRANSLOG_CONCURRENT_RECOVERY_BATCH_SIZE is limited to ensure that memory usage is controllable under the worst-case scenario, and explanations have also been added to the code.
  3. The logic for initializing and closing the translog snapshot has been moved to the task execution process to avoid occupying too many file descriptors.
  4. Use ExecutorCompletionService to obtain completed tasks. When an exception occurs, cancel other futures.

Could you please help review it again?

@guojialiang92 guojialiang92 requested a review from atris January 26, 2026 09:11
Copy link
Copy Markdown
Contributor

@atris atris left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 46ca835: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>
@guojialiang92 guojialiang92 force-pushed the dev/support_concurrent_translog_recovery branch from 320c67a to 0cf9051 Compare January 27, 2026 06:23
@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for 0cf9051: SUCCESS

@guojialiang92 guojialiang92 merged commit 3688680 into opensearch-project:main Jan 27, 2026
34 checks passed
tanyabti pushed a commit to tanyabti/OpenSearch that referenced this pull request Feb 24, 2026
…ion primary promotion (opensearch-project#20251)

* support concurrent translog recovery

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* update

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* update

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* update

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* refactor

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* update

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* refactor test

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* add translogConcurrentRecoverySemaphore

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* use FutureUtils.cancel

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* use FutureUtils.cancel

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* Limit the maximum value of batch to 1 million

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* add UT

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* simplify snapshot close and fix test

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* add change log

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* Add comments

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

---------

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>
tanyabti pushed a commit to tanyabti/OpenSearch that referenced this pull request Feb 24, 2026
…ion primary promotion (opensearch-project#20251)

* support concurrent translog recovery

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* update

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* update

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* update

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* refactor

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* update

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* refactor test

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* add translogConcurrentRecoverySemaphore

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* use FutureUtils.cancel

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* use FutureUtils.cancel

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* Limit the maximum value of batch to 1 million

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* add UT

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* simplify snapshot close and fix test

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* add change log

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

* Add comments

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>

---------

Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants