Introduce concurrent translog recovery to accelerate segment replication primary promotion#20251
Conversation
Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the
📝 WalkthroughWalkthroughAdds dynamic settings to enable batched concurrent translog recovery, registers them as cluster settings, introduces a dedicated TRANSLOG_RECOVERY thread pool, implements conditional parallel per-batch translog recovery in IndexShard (with sequential fallback), and updates tests and test-framework to exercise the new path. Changes
Sequence DiagramsequenceDiagram
participant Initiator as Recovery initiator
participant IndexShard
participant RecoverySettings
participant TranslogMgr as TranslogManager
participant ThreadPool as TRANSLOG_RECOVERY
participant Engine
Initiator->>IndexShard: start translog recovery
IndexShard->>RecoverySettings: check concurrent enabled & batchSize
alt concurrent enabled & pool available & ops > batchSize
IndexShard->>TranslogMgr: obtain full translog snapshot
IndexShard->>IndexShard: compute batch boundaries
loop per batch
IndexShard->>ThreadPool: submit batch recovery task (snapshot slice)
ThreadPool->>Engine: apply batch operations
Engine-->>ThreadPool: result / exception
ThreadPool-->>IndexShard: Future completes
IndexShard->>TranslogMgr: close per-batch snapshot
end
alt any failure
IndexShard->>Initiator: throw aggregated IOException
else
IndexShard->>Initiator: return total recovered ops
end
else fallback
IndexShard->>TranslogMgr: single-snapshot sequential recovery
TranslogMgr->>Engine: apply ops serially
Engine-->>IndexShard: recovery complete
IndexShard->>Initiator: return recovered ops
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes
Suggested labels
Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java (1)
718-802: Well-structured test for concurrent translog recovery.The test adequately exercises the concurrent translog recovery path with 200-300 documents and a batch size of 10, creating 20-30 concurrent batches. The assertions comprehensively verify engine types, document counts, translog statistics, and history consistency.
One observation: the batch size of 10 is significantly smaller than the production default of 50,000. While this is intentional for testing, consider adding a brief comment explaining this choice to help future maintainers understand the test design.
+ // Use a small batch size to ensure multiple concurrent batches are created during recovery @TestLogging(reason = "Getting trace logs from IndexShard", value = "org.opensearch.index.shard.IndexShard:TRACE") public void testPrimaryPromotionWithConcurrentTranslogRecovery() throws Exception { final RecoverySettings recoverySettings = new RecoverySettings(
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
server/src/main/java/org/opensearch/common/settings/ClusterSettings.java(1 hunks)server/src/main/java/org/opensearch/index/shard/IndexShard.java(4 hunks)server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java(5 hunks)server/src/main/java/org/opensearch/threadpool/ThreadPool.java(3 hunks)server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java(2 hunks)test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java(2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (20)
- GitHub Check: gradle-check
- GitHub Check: assemble (21, windows-latest)
- GitHub Check: assemble (21, ubuntu-latest)
- GitHub Check: assemble (25, ubuntu-latest)
- GitHub Check: assemble (25, ubuntu-24.04-arm)
- GitHub Check: assemble (21, ubuntu-24.04-arm)
- GitHub Check: assemble (25, windows-latest)
- GitHub Check: detect-breaking-change
- GitHub Check: precommit (21, windows-2025, true)
- GitHub Check: Analyze (java)
- GitHub Check: precommit (25, macos-15-intel)
- GitHub Check: precommit (21, macos-15)
- GitHub Check: precommit (21, ubuntu-24.04-arm)
- GitHub Check: precommit (21, ubuntu-latest)
- GitHub Check: precommit (25, macos-15)
- GitHub Check: precommit (25, windows-latest)
- GitHub Check: precommit (25, ubuntu-latest)
- GitHub Check: precommit (21, windows-latest)
- GitHub Check: precommit (21, macos-15-intel)
- GitHub Check: precommit (25, ubuntu-24.04-arm)
🔇 Additional comments (11)
test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java (2)
314-316: LGTM - Constructor correctly uses new overload with RecoverySettings.The constructor now properly passes
recoverySettingsto replicas, enabling concurrent translog recovery testing scenarios.
472-487: LGTM - New overload properly supports RecoverySettings injection.The new
addReplica(Path, RecoverySettings)overload correctly constructs anIndexShardwith the providedRecoverySettings, enabling tests to exercise concurrent translog recovery paths.server/src/main/java/org/opensearch/common/settings/ClusterSettings.java (1)
349-350: LGTM - New cluster settings properly registered.The concurrent translog recovery settings are correctly added to
BUILT_IN_CLUSTER_SETTINGS, enabling dynamic configuration updates.server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java (1)
38-40: LGTM - Necessary imports added for test infrastructure.server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java (4)
269-270: LGTM - Thread-safe volatile fields for concurrent access.
306-307: LGTM - Settings properly initialized from constructor.
347-351: LGTM - Dynamic setting update consumers properly registered.
521-535: LGTM - Standard accessor pattern with private setters.The public getters and private setters follow the established pattern for cluster settings in this class.
server/src/main/java/org/opensearch/threadpool/ThreadPool.java (3)
102-102: LGTM - New thread pool name constant added.
183-183: LGTM - Thread pool type correctly mapped to FIXED.A FIXED pool type is appropriate for translog recovery to provide bounded parallelism.
263-263: Verify unbounded queue is intentional for TRANSLOG_RECOVERY pool.The queue size of
-1creates an unbounded queue. During large translog recoveries, this could lead to memory pressure if many recovery tasks are queued. Consider whether a bounded queue with a reasonable limit would be more appropriate, similar to other fixed pools likeWRITE(queue size 10000) orTRANSLOG_SYNC(queue size 10000).If unbounded is intentional (e.g., because concurrent recovery is already gated by settings), please document this design choice.
server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java
Outdated
Show resolved
Hide resolved
|
❌ Gradle check result for ac424e3: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (1)
server/src/main/java/org/opensearch/index/shard/IndexShard.java (1)
5281-5336: Concurrent translog recovery runner looks solid; consider loosening InternalTranslogManager coupling and documenting assumptionsThe new
TranslogRecoveryRunnerimplementation inresetEngineToGlobalCheckpoint()generally looks correct and addresses earlier issues:
- Snapshots created for each batch are now always closed in
finallyviaIOUtils.closeWhileHandlingException, so there is no longer a leak if any task fails.- Batch loop bounds use a standard ceil‑division pattern, avoiding the extra empty batch when
totalOperationsis an exact multiple ofbatchSize.- Exceptions from individual futures are aggregated into a single
IOException, preserving all suppressed causes while still returning the total recovered op count on success.- Concurrency is conservatively gated on:
recoverySettings.isTranslogConcurrentRecoveryEnable(),indexSettings.isSegRepEnabledOrRemoteNode(),totalOperations > batchSize, and- the TRANSLOG_RECOVERY executor queue being empty,
so you fall back to the well‑tested sequential path when conditions aren’t favorable.A few follow‑ups worth considering:
Avoid hard dependency on
InternalTranslogManagerwhere possible.
Theassert translogManager instanceof InternalTranslogManager;and the downcast to callgetTranslog().newSnapshot(start, end)tie this path to a specific implementation. If you expect otherTranslogManagerimplementations (plugins, future remote variants), it might be cleaner to:
- either expose a
newSnapshot(long fromSeqNo, long toSeqNo)style API onTranslogManager, or- guard the concurrent path with an
instanceofand fall back to sequential recovery when the manager is notInternalTranslogManager.
Right now theassertonly fires with assertions enabled; in production we’d get aClassCastExceptioninstead.Clarify assumptions around partitioning by seq_no.
The batching logic derives[start, end]ranges fromengine.getProcessedLocalCheckpoint()andbatchSize, and usessnapshot.totalOperations()purely to compute the number of batches. This relies on translog invariants (strictly increasing seq_nos, no concurrent appends because operations are blocked duringresetEngineToGlobalCheckpoint) so that the union of your per‑batch snapshots matches the original recovery range. A short comment here explaining that assumption would help future maintainers understand why it’s safe to ignore the passedsnapshotother than fortotalOperations().TRANSLOG_RECOVERY executor assumption.
The cast toOpenSearchThreadPoolExecutorand queue emptiness check are fine with today’sFixedExecutorBuildersetup forThreadPool.Names.TRANSLOG_RECOVERY, but they implicitly assume that name will always map to anOpenSearchThreadPoolExecutor. If that ever changes, a runtimeClassCastExceptionwill be thrown. A brief comment noting the expectation (or aninstanceofguard that falls back to sequential recovery if the cast fails) would make this more robust.If you’re comfortable with the current constraints (internal manager only and fixed executor type), this can merge as‑is; otherwise the above refactors are low‑risk and can be done now or in a follow‑up.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
server/src/main/java/org/opensearch/index/shard/IndexShard.java(4 hunks)server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java(5 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
server/src/main/java/org/opensearch/index/shard/IndexShard.java (3)
server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java (1)
InternalTranslogManager(38-480)server/src/main/java/org/opensearch/threadpool/ThreadPool.java (1)
Names(99-130)libs/common/src/main/java/org/opensearch/common/util/io/IOUtils.java (1)
IOUtils(58-317)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (19)
- GitHub Check: gradle-check
- GitHub Check: assemble (25, ubuntu-24.04-arm)
- GitHub Check: assemble (21, ubuntu-latest)
- GitHub Check: assemble (21, windows-latest)
- GitHub Check: assemble (21, ubuntu-24.04-arm)
- GitHub Check: precommit (21, ubuntu-24.04-arm)
- GitHub Check: assemble (25, ubuntu-latest)
- GitHub Check: precommit (21, macos-15-intel)
- GitHub Check: precommit (25, ubuntu-latest)
- GitHub Check: assemble (25, windows-latest)
- GitHub Check: precommit (21, windows-2025, true)
- GitHub Check: precommit (21, macos-15)
- GitHub Check: precommit (25, ubuntu-24.04-arm)
- GitHub Check: precommit (21, windows-latest)
- GitHub Check: precommit (25, macos-15-intel)
- GitHub Check: precommit (21, ubuntu-latest)
- GitHub Check: precommit (25, windows-latest)
- GitHub Check: Analyze (java)
- GitHub Check: detect-breaking-change
🔇 Additional comments (2)
server/src/main/java/org/opensearch/index/shard/IndexShard.java (1)
106-107: New imports align with concurrent translog recovery usageThe added imports (
OpenSearchThreadPoolExecutor,InternalTranslogManager,TranslogManager,Future) are consistent with the new concurrent recovery runner and TRANSLOG_RECOVERY thread pool usage; no issues here.Also applies to: 187-195, 246-247
server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java (1)
233-246: Concurrent translog recovery settings wiring is correct; clarify intended default batch sizeThe settings, backing fields, constructor initialization, and dynamic update consumers are all consistent and thread‑safe; min
1onINDICES_TRANSLOG_CONCURRENT_RECOVERY_BATCH_SIZEprevents division-by-zero.The same
batch_sizesetting serves dual purposes inIndexShard:
- Threshold for switching from single‑threaded to concurrent recovery (
totalOperations > batchSize, line 5290)- Per-thread batch size, calculated as
(totalOperations + batchSize - 1) / batchSize(line 5296)This dual-use is correct but worth documenting: tuning the batch size affects both when concurrency activates and how work is divided per thread. If separate tuning of "concurrency threshold" vs "operations per thread" is needed later, consider splitting into two settings.
Please confirm the intended default of
50,000operations for this batch size.
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
server/src/main/java/org/opensearch/index/shard/IndexShard.java (1)
5279-5332: Make concurrent translog recovery robust to snapshot/submit failures and tighten batch logicThe concurrent
TranslogRecoveryRunneris close, but there are two important issues:
Snapshot leak on failures in the creation loop
If either:
engine.translogManager().newChangesSnapshot(start, end, false)throws, orthreadPool.executor(...).submit(...)throws (e.g.,RejectedExecutionExceptionduring shutdown),the method exits before reaching the second loop that closes snapshots. Any snapshots created in prior iterations of the loop are never closed, leaking file descriptors until shard/engine shutdown.
To fix this, wrap the batch creation + execution phase in a
try/finallythat always closes everyTranslog.Snapshotyou created, and explicitly close the snapshot ifsubmit()fails before it’s added to the list.Cleaner last-batch detection
The last-batch check
i == totalOperations / batchSizerelies on the same arithmetic as the loop bound but is harder to read and easy to get wrong during future changes. Precomputing the batch count and checking againstbatches - 1is clearer and matches theceilformula used in the loop bound.A minimally invasive refactor that addresses both points:
- final TranslogRecoveryRunner translogRunner = (snapshot) -> { - Engine engine = newEngineReference.get(); - assert null != engine; - int totalOperations = snapshot.totalOperations(); - long batchSize = recoverySettings.getTranslogConcurrentRecoveryBatchSize(); - boolean threadPoolNotBusy = ((OpenSearchThreadPoolExecutor) threadPool.executor(ThreadPool.Names.TRANSLOG_RECOVERY)).getQueue() - .isEmpty(); - if (recoverySettings.isTranslogConcurrentRecoveryEnable() - && indexSettings.isSegRepEnabledOrRemoteNode() - && totalOperations > batchSize - && threadPoolNotBusy) { - long localCheckpoint = engine.getProcessedLocalCheckpoint(); - List<Tuple<Future<Integer>, Translog.Snapshot>> translogSnapshotsFutureList = new ArrayList<>(); - for (int i = 0; i < (totalOperations + batchSize - 1) / batchSize; i++) { - long start = localCheckpoint + 1 + (long) i * batchSize; - long end = (i == totalOperations / batchSize) ? Long.MAX_VALUE : start + batchSize - 1; - Translog.Snapshot translogSnapshot = engine.translogManager().newChangesSnapshot(start, end, false); - translogSnapshotsFutureList.add( - new Tuple<>( - threadPool.executor(ThreadPool.Names.TRANSLOG_RECOVERY) - .submit(() -> runTranslogRecovery(engine, translogSnapshot, Engine.Operation.Origin.LOCAL_RESET, () -> { - // TODO: add a dedicate recovery stats for the reset translog - })), - translogSnapshot - ) - ); - } - Exception exception = null; - int totalRecovered = 0; - for (Tuple<Future<Integer>, Translog.Snapshot> translogSnapshotFuture : translogSnapshotsFutureList) { - try { - int recoveredOps = translogSnapshotFuture.v1().get(); - totalRecovered += recoveredOps; - } catch (Exception e) { - if (exception == null) { - exception = e; - } else { - exception.addSuppressed(e); - } - } finally { - Translog.Snapshot translogSnapshot = translogSnapshotFuture.v2(); - IOUtils.closeWhileHandlingException(translogSnapshot); - } - } - if (exception != null) { - throw new IOException("generate exception when concurrent translog recovery", exception); - } - return totalRecovered; - } else { - return runTranslogRecovery(newEngineReference.get(), snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> { - // TODO: add a dedicate recovery stats for the reset translog - }); - } - }; + final TranslogRecoveryRunner translogRunner = (snapshot) -> { + Engine engine = newEngineReference.get(); + assert engine != null; + final int totalOperations = snapshot.totalOperations(); + final long batchSize = recoverySettings.getTranslogConcurrentRecoveryBatchSize(); + final boolean threadPoolNotBusy = ((OpenSearchThreadPoolExecutor) threadPool.executor(ThreadPool.Names.TRANSLOG_RECOVERY)) + .getQueue() + .isEmpty(); + + if (recoverySettings.isTranslogConcurrentRecoveryEnable() + && indexSettings.isSegRepEnabledOrRemoteNode() + && totalOperations > batchSize + && threadPoolNotBusy) { + final long localCheckpoint = engine.getProcessedLocalCheckpoint(); + final long batches = (totalOperations + batchSize - 1) / batchSize; + final List<Tuple<Future<Integer>, Translog.Snapshot>> translogSnapshotsFutureList = new ArrayList<>(); + Exception exception = null; + int totalRecovered = 0; + + try { + for (long i = 0; i < batches; i++) { + final long start = localCheckpoint + 1 + i * batchSize; + final long end = (i == batches - 1) ? Long.MAX_VALUE : start + batchSize - 1; + final Translog.Snapshot translogSnapshot = engine.translogManager().newChangesSnapshot(start, end, false); + try { + Future<Integer> future = threadPool.executor(ThreadPool.Names.TRANSLOG_RECOVERY) + .submit( + () -> runTranslogRecovery( + engine, + translogSnapshot, + Engine.Operation.Origin.LOCAL_RESET, + () -> { + // TODO: add a dedicate recovery stats for the reset translog + } + ) + ); + translogSnapshotsFutureList.add(new Tuple<>(future, translogSnapshot)); + } catch (RuntimeException e) { + IOUtils.closeWhileHandlingException(translogSnapshot); + throw e; + } + } + + for (Tuple<Future<Integer>, Translog.Snapshot> translogSnapshotFuture : translogSnapshotsFutureList) { + try { + int recoveredOps = translogSnapshotFuture.v1().get(); + totalRecovered += recoveredOps; + } catch (Exception e) { + if (exception == null) { + exception = e; + } else { + exception.addSuppressed(e); + } + } + } + + if (exception != null) { + throw new IOException("exception during concurrent translog recovery", exception); + } + return totalRecovered; + } finally { + for (Tuple<Future<Integer>, Translog.Snapshot> translogSnapshotFuture : translogSnapshotsFutureList) { + IOUtils.closeWhileHandlingException(translogSnapshotFuture.v2()); + } + } + } else { + return runTranslogRecovery(newEngineReference.get(), snapshot, Engine.Operation.Origin.LOCAL_RESET, () -> { + // TODO: add a dedicate recovery stats for the reset translog + }); + } + };This keeps the overall behavior and conditions intact, but ensures snapshots are always closed and makes the batch structure easier to reason about.
🧹 Nitpick comments (2)
server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java (2)
777-781: Consider verifying that concurrent recovery was actually used.While the test correctly configures concurrent translog recovery and verifies the end result, it doesn't explicitly verify that the concurrent code path was exercised. Consider adding assertions or logging to confirm that concurrent recovery actually occurred (e.g., checking thread pool activity or recovery metrics).
717-800: Consider extracting common logic to reduce duplication.This test method shares significant logic with
testNRTReplicaPromotedAsPrimary(lines 533-599). While the duplication is reasonable for testing different scenarios, consider extracting the common test flow into a helper method that acceptsRecoverySettingsandMergedSegmentPublisheras parameters. This would improve maintainability if the test logic needs to change in the future.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
server/src/main/java/org/opensearch/index/shard/IndexShard.java(3 hunks)server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
server/src/main/java/org/opensearch/index/shard/IndexShard.java (2)
server/src/main/java/org/opensearch/threadpool/ThreadPool.java (1)
Names(99-130)server/src/main/java/org/opensearch/index/engine/Engine.java (1)
Operation(1503-1603)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (20)
- GitHub Check: gradle-check
- GitHub Check: precommit (21, windows-2025, true)
- GitHub Check: precommit (25, windows-latest)
- GitHub Check: precommit (25, ubuntu-latest)
- GitHub Check: precommit (21, macos-15)
- GitHub Check: precommit (25, macos-15-intel)
- GitHub Check: precommit (25, macos-15)
- GitHub Check: precommit (25, ubuntu-24.04-arm)
- GitHub Check: precommit (21, macos-15-intel)
- GitHub Check: precommit (21, ubuntu-latest)
- GitHub Check: precommit (21, windows-latest)
- GitHub Check: precommit (21, ubuntu-24.04-arm)
- GitHub Check: assemble (21, windows-latest)
- GitHub Check: assemble (21, ubuntu-24.04-arm)
- GitHub Check: assemble (25, ubuntu-latest)
- GitHub Check: assemble (25, ubuntu-24.04-arm)
- GitHub Check: assemble (21, ubuntu-latest)
- GitHub Check: assemble (25, windows-latest)
- GitHub Check: detect-breaking-change
- GitHub Check: Analyze (java)
🔇 Additional comments (7)
server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java (6)
38-38: LGTM!The import is correctly added to support the new test method's usage of
MergedSegmentPublisher.EMPTY.
718-724: LGTM!The
RecoverySettingsconfiguration correctly enables concurrent translog recovery with a small batch size (10) to ensure the concurrent code path is exercised when larger document counts are indexed.
725-738: LGTM!The replication group setup correctly passes the custom
RecoverySettingsandMergedSegmentPublisher.EMPTYto enable concurrent translog recovery testing.
740-752: LGTM!The initial document indexing, replication, and assertions correctly verify the starting state before testing concurrent translog recovery.
773-775: LGTM!The primary promotion logic correctly syncs the global checkpoint before promoting the replica.
783-798: LGTM!The final assertions comprehensively verify engine types, document counts, translog state, and consistency across all shards after concurrent translog recovery.
server/src/main/java/org/opensearch/index/shard/IndexShard.java (1)
105-107: Imports for concurrent translog recovery look correct
OpenSearchThreadPoolExecutorandFutureare appropriately imported and used in the new concurrent translog recovery logic; no issues here.Also applies to: 243-245
...rc/test/java/org/opensearch/index/shard/SegmentReplicationWithNodeToNodeIndexShardTests.java
Outdated
Show resolved
Hide resolved
8f37578 to
cc6738c
Compare
|
❌ Gradle check result for cc6738c: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>
cc6738c to
905ccb3
Compare
|
❕ Gradle check result for 905ccb3: UNSTABLE Please review all flaky tests that succeeded after retry and create an issue if one does not already exist to track the flaky failure. |
|
Thank you for your rigorous review @atris. Based on your review suggestions, I have made the following optimizations.
Could you please help review it again? |
…t_translog_recovery
|
❌ Gradle check result for 46ca835: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>
320c67a to
0cf9051
Compare
…ion primary promotion (opensearch-project#20251) * support concurrent translog recovery Signed-off-by: guojialiang <guojialiang.2012@bytedance.com> * update Signed-off-by: guojialiang <guojialiang.2012@bytedance.com> * update Signed-off-by: guojialiang <guojialiang.2012@bytedance.com> * update Signed-off-by: guojialiang <guojialiang.2012@bytedance.com> * refactor Signed-off-by: guojialiang <guojialiang.2012@bytedance.com> * update Signed-off-by: guojialiang <guojialiang.2012@bytedance.com> * refactor test Signed-off-by: guojialiang <guojialiang.2012@bytedance.com> * add translogConcurrentRecoverySemaphore Signed-off-by: guojialiang <guojialiang.2012@bytedance.com> * use FutureUtils.cancel Signed-off-by: guojialiang <guojialiang.2012@bytedance.com> * use FutureUtils.cancel Signed-off-by: guojialiang <guojialiang.2012@bytedance.com> * Limit the maximum value of batch to 1 million Signed-off-by: guojialiang <guojialiang.2012@bytedance.com> * add UT Signed-off-by: guojialiang <guojialiang.2012@bytedance.com> * simplify snapshot close and fix test Signed-off-by: guojialiang <guojialiang.2012@bytedance.com> * add change log Signed-off-by: guojialiang <guojialiang.2012@bytedance.com> * Add comments Signed-off-by: guojialiang <guojialiang.2012@bytedance.com> --------- Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>
…ion primary promotion (opensearch-project#20251) * support concurrent translog recovery Signed-off-by: guojialiang <guojialiang.2012@bytedance.com> * update Signed-off-by: guojialiang <guojialiang.2012@bytedance.com> * update Signed-off-by: guojialiang <guojialiang.2012@bytedance.com> * update Signed-off-by: guojialiang <guojialiang.2012@bytedance.com> * refactor Signed-off-by: guojialiang <guojialiang.2012@bytedance.com> * update Signed-off-by: guojialiang <guojialiang.2012@bytedance.com> * refactor test Signed-off-by: guojialiang <guojialiang.2012@bytedance.com> * add translogConcurrentRecoverySemaphore Signed-off-by: guojialiang <guojialiang.2012@bytedance.com> * use FutureUtils.cancel Signed-off-by: guojialiang <guojialiang.2012@bytedance.com> * use FutureUtils.cancel Signed-off-by: guojialiang <guojialiang.2012@bytedance.com> * Limit the maximum value of batch to 1 million Signed-off-by: guojialiang <guojialiang.2012@bytedance.com> * add UT Signed-off-by: guojialiang <guojialiang.2012@bytedance.com> * simplify snapshot close and fix test Signed-off-by: guojialiang <guojialiang.2012@bytedance.com> * add change log Signed-off-by: guojialiang <guojialiang.2012@bytedance.com> * Add comments Signed-off-by: guojialiang <guojialiang.2012@bytedance.com> --------- Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>
Description
The purpose of this PR is to introduce the translog concurrent recovery mechanism mentioned in #20131, which is used to accelerate the primary promotion of segment replication.
Default concurrency strategy
translog_recoverywith a size equal to the number of cores and an unbounded queue.indices.translog_concurrent_recovery.enable, which is used to enable translog concurrent recovery, with a default value offalse.indices.translog_concurrent_recovery.batch_size, which is used to represent the number of translog operations processed by a single thread, with a default value of500,000.indices.translog_concurrent_recovery.batch_size, concurrency is not enabled, remaining consistent with the current execution logic.indices.translog_concurrent_recovery.batch_size.translog_recoveryis not busy, enable concurrency, with each thread responsible for executing the number of operations specified byindices.translog_concurrent_recovery.batch_size.translog_recoveryis busy, do not enable concurrency, keeping it consistent with the current execution logic.Related Issues
Resolves #[20131]
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.
Summary by CodeRabbit
New Features
Tests
✏️ Tip: You can customize this high-level summary in your review settings.