Add warmup phase for pull-based ingestion#20526
Add warmup phase for pull-based ingestion#20526msfroh merged 12 commits intoopensearch-project:mainfrom
Conversation
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the Use the checkbox below for a quick retry:
📝 WalkthroughWalkthroughAdds a warmup phase for pull-based ingestion to wait for lag to catch up before serving. Introduces six new configuration settings in IndexMetadata, propagates them through IngestionSource and IngestionEngine, and implements warmup state tracking and timeout handling in DefaultStreamPoller. Changes
Sequence DiagramsequenceDiagram
participant Shard as IndexShard
participant Engine as IngestionEngine
participant Poller as DefaultStreamPoller
participant Source as Stream Source
Shard->>Engine: postRecovery()
Engine->>Engine: refresh()
Engine->>Poller: isWarmupComplete()?
Poller->>Source: poll for lag
alt Warmup Disabled
Poller-->>Engine: true (warmup complete)
else Lag Below Threshold
Poller->>Poller: checkWarmupStatus()
Poller-->>Engine: true (warmup complete)
else Timeout Elapsed
alt failOnTimeout = true
Poller-->>Engine: throw exception
else failOnTimeout = false
Poller-->>Engine: true (proceed with warning)
end
end
Engine->>Shard: warmup done, ready to serve
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Suggested labels
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
e239448 to
3869bb5
Compare
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Fix all issues with AI agents
In `@CHANGELOG.md`:
- Line 28: Update the changelog entry so the link references the correct PR
number: replace the incorrect reference "#20506" with "#20526" in the line
containing "Add warmup phase to wait for lag to catch up in pull-based ingestion
before serving" so the hyperlink points to PR 20526 rather than an issue number.
In `@server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java`:
- Around line 995-1067: getIngestionSource currently reads several ingestion
settings but omits the six warmup settings declared (INGESTION_SOURCE_WARMUP_*)
so index-level warmup config is ignored; in getIngestionSource() read each
warmup setting from settings using INGESTION_SOURCE_WARMUP_ENABLED_SETTING,
INGESTION_SOURCE_WARMUP_TIMEOUT_SETTING,
INGESTION_SOURCE_WARMUP_LAG_THRESHOLD_SETTING,
INGESTION_SOURCE_WARMUP_USE_TIME_LAG_SETTING,
INGESTION_SOURCE_WARMUP_TIME_LAG_THRESHOLD_SETTING, and
INGESTION_SOURCE_WARMUP_FAIL_ON_TIMEOUT_SETTING and pass them into the
IngestionSource.Builder chain by calling the corresponding builder setters (e.g.
setWarmupEnabled, setWarmupTimeout, setWarmupLagThreshold, setWarmupUseTimeLag,
setWarmupTimeLagThreshold, setWarmupFailOnTimeout) so the builder receives the
index-level values.
In
`@server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java`:
- Around line 450-457: Add a log statement when the code falls back to
time-based lag in DefaultStreamPoller: inside the branch where (warmupUseTimeLag
|| cachedPointerBasedLag < 0) and before calling computeTimeBasedLag(), log that
the poller is using time-based lag (include values of cachedPointerBasedLag and
warmupTimeLagThreshold) so operators can see the active lag metric; reference
the variables/methods: warmupUseTimeLag, cachedPointerBasedLag,
computeTimeBasedLag(), warmupTimeLagThreshold, and the enclosing
DefaultStreamPoller class when adding the log.
🧹 Nitpick comments (4)
server/src/test/java/org/opensearch/indices/pollingingest/DefaultStreamPollerTests.java (2)
909-911: Unused variablecompleted— consider asserting on return value or removing.The return value of
awaitWarmupComplete(2000)is stored but not used in the assertion. Either assert oncompleteddirectly, or remove the variable.♻️ Suggested fix
- boolean completed = warmupPoller.awaitWarmupComplete(2000); - // Even if lag never reaches threshold, warmup completes on timeout - assertTrue("Warmup should complete after timeout", warmupPoller.isWarmupComplete()); + warmupPoller.awaitWarmupComplete(2000); + // Even if lag never reaches threshold, warmup completes on timeout + assertTrue("Warmup should complete after timeout", warmupPoller.isWarmupComplete());Or alternatively, assert on the return value:
- boolean completed = warmupPoller.awaitWarmupComplete(2000); - // Even if lag never reaches threshold, warmup completes on timeout - assertTrue("Warmup should complete after timeout", warmupPoller.isWarmupComplete()); + // awaitWarmupComplete may return false due to its own timeout, + // but warmup should be complete due to internal timeout + warmupPoller.awaitWarmupComplete(2000); + assertTrue("Warmup should complete after internal timeout", warmupPoller.isWarmupComplete());
946-947:Thread.sleep(200)may cause test flakiness.Fixed sleep times in tests can lead to flaky behavior in CI environments with varying load. Consider using
assertBusy()orwaitUntil()as used elsewhere in this test class.♻️ Suggested fix
warmupPoller.start(); - // Give it time to start - Thread.sleep(200); + // Wait for warmup to start + waitUntil(() -> warmupPoller.getState() != StreamPoller.State.NONE, awaitTime, TimeUnit.MILLISECONDS);server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java (1)
182-187: Minor:Objects.equals()on primitives causes autoboxing.Using
Objects.equals()on primitive types (boolean,long) causes unnecessary autoboxing. While functionally correct, using==for primitive comparisons is more efficient.♻️ Suggested fix
&& Objects.equals(mapperType, ingestionSource.mapperType) - && Objects.equals(warmupEnabled, ingestionSource.warmupEnabled) + && warmupEnabled == ingestionSource.warmupEnabled && Objects.equals(warmupTimeout, ingestionSource.warmupTimeout) - && Objects.equals(warmupLagThreshold, ingestionSource.warmupLagThreshold) - && Objects.equals(warmupUseTimeLag, ingestionSource.warmupUseTimeLag) - && Objects.equals(warmupTimeLagThreshold, ingestionSource.warmupTimeLagThreshold) - && Objects.equals(warmupFailOnTimeout, ingestionSource.warmupFailOnTimeout); + && warmupLagThreshold == ingestionSource.warmupLagThreshold + && warmupUseTimeLag == ingestionSource.warmupUseTimeLag + && warmupTimeLagThreshold == ingestionSource.warmupTimeLagThreshold + && warmupFailOnTimeout == ingestionSource.warmupFailOnTimeout;server/src/main/java/org/opensearch/index/engine/IngestionEngine.java (1)
684-689: AddisWarmupFailOnTimeout()to theStreamPollerinterface for API consistency.The
instanceofcheck and cast toDefaultStreamPollercreates an inconsistent API surface. The other warmup methods (isWarmupComplete(),awaitWarmupComplete()) are declared in theStreamPollerinterface, butisWarmupFailOnTimeout()requires knowledge of the concrete implementation.♻️ Suggested approach
Add the method to the
StreamPollerinterface:// In StreamPoller.java boolean isWarmupFailOnTimeout();Then in
IngestionEngine.java:public boolean isWarmupFailOnTimeout() { - if (streamPoller instanceof DefaultStreamPoller) { - return ((DefaultStreamPoller) streamPoller).isWarmupFailOnTimeout(); - } - return false; + return streamPoller.isWarmupFailOnTimeout(); }
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (10)
CHANGELOG.mdserver/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.javaserver/src/main/java/org/opensearch/cluster/metadata/IngestionSource.javaserver/src/main/java/org/opensearch/common/settings/IndexScopedSettings.javaserver/src/main/java/org/opensearch/index/engine/IngestionEngine.javaserver/src/main/java/org/opensearch/index/shard/IndexShard.javaserver/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.javaserver/src/main/java/org/opensearch/indices/pollingingest/StreamPoller.javaserver/src/test/java/org/opensearch/cluster/metadata/IngestionSourceTests.javaserver/src/test/java/org/opensearch/indices/pollingingest/DefaultStreamPollerTests.java
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2026-01-13T17:40:27.167Z
Learnt from: reta
Repo: opensearch-project/OpenSearch PR: 20411
File: server/src/main/java/org/opensearch/index/codec/CodecService.java:112-133
Timestamp: 2026-01-13T17:40:27.167Z
Learning: Avoid capturing or evaluating a supplier (e.g., this::defaultCodec) upfront when passing it to a registry during object construction. If registries may replace defaults during iteration (as in EnginePlugin.getAdditionalCodecs), pass the supplier itself and only resolve it at use time. This ensures dynamic behavior is preserved during initialization and prevents premature binding of defaults in codecs/registry setup. This pattern should apply to similar initialization paths in Java server code where registries may mutate defaults during construction.
Applied to files:
server/src/main/java/org/opensearch/index/engine/IngestionEngine.javaserver/src/main/java/org/opensearch/index/shard/IndexShard.javaserver/src/main/java/org/opensearch/indices/pollingingest/StreamPoller.javaserver/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.javaserver/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.javaserver/src/main/java/org/opensearch/cluster/metadata/IngestionSource.javaserver/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: gradle-check
🔇 Additional comments (15)
server/src/main/java/org/opensearch/indices/pollingingest/StreamPoller.java (2)
86-97: No changes needed. The warmup methods are properly implemented in the only StreamPoller implementation (DefaultStreamPoller), and the interface is marked@ExperimentalApi, where breaking changes are expected and documented. The suggested default methods returningtruewould incorrectly bypass the warmup synchronization logic and defeat the feature's purpose.Likely an incorrect or invalid review comment.
102-109: State handling forWARMING_UPis complete—no action needed.The codebase does not use switch statements on
StreamPoller.Statethat would require explicitWARMING_UPhandling. Instead, state transitions are managed via conditional assignments:WARMING_UPis set when warmup is enabled (line 237), and subsequent states (CLOSED,PAUSED,POLLING,PROCESSING) are assigned based on runtime conditions in the main loop. The only state comparison (while (state != State.CLOSED)) doesn't require aWARMING_UPcase. The state machine design is inherently safe and avoids fall-through to default/illegal paths.Likely an incorrect or invalid review comment.
server/src/main/java/org/opensearch/index/shard/IndexShard.java (1)
2476-2499: Confirm warmup wait exits promptly on shard close.
The warmup wait runs while holdingpostRecoveryMutex. If the shard closes while waiting, this could hold the mutex until timeout and delay shutdown unlessawaitWarmupCompleteaborts on engine close/interrupt. Please confirm it does, or add a closed-state/cancellation check to avoid long waits during shutdown.server/src/test/java/org/opensearch/cluster/metadata/IngestionSourceTests.java (2)
110-112: toString expectation updated appropriately.
This keeps the test aligned with the new warmup fields.
141-222: Good warmup configuration coverage.
These tests nicely cover defaults, equality, and copy semantics for the new warmup fields.server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java (1)
292-297: Warmup settings registered correctly.
Adding these to the built-in index settings set is the right place to ensure validation and visibility.server/src/main/java/org/opensearch/index/engine/IngestionEngine.java (1)
150-156: LGTM!Warmup configuration is cleanly propagated from
IngestionSourceto the stream poller builder.server/src/test/java/org/opensearch/indices/pollingingest/DefaultStreamPollerTests.java (2)
98-105: LGTM!Correctly disabling warmup for existing tests preserves backward compatibility and test stability.
801-833: LGTM — Good coverage of warmup scenarios.The new warmup tests cover:
- Disabled warmup behavior
- Immediate completion when lag threshold is met
- Timeout behavior
- Time-based lag configuration
isWarmupFailOnTimeoutflagAlso applies to: 835-873, 956-989, 991-1045
server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java (2)
52-58: LGTM!Warmup configuration fields, constructor assignments, and getters are cleanly implemented with appropriate encapsulation.
Also applies to: 90-96, 142-164
316-322: LGTM!Builder warmup fields with defaults, copy constructor propagation, setters, and build() method are correctly implemented.
Also applies to: 338-343, 401-429, 443-449
server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java (4)
60-69: LGTM!Appropriate use of
volatilefor fields accessed across threads (warmupComplete,warmupStartTime) and a dedicated lock object for wait/notify coordination.Also applies to: 192-204
234-239: LGTM!Warmup initialization correctly sets the start time and transitions to
WARMING_UPstate before the polling loop begins.
419-440: LGTM!The
awaitWarmupComplete()implementation correctly handles the wait/notify pattern with timeout. The early return check outside the synchronized block using volatile read is an appropriate optimization, and the synchronized block properly handles races.
741-747: LGTM!Builder defaults match the IndexMetadata settings as documented, and all warmup setters follow the fluent builder pattern consistently.
Also applies to: 847-921
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java
Outdated
Show resolved
Hide resolved
|
❌ Gradle check result for 3869bb5: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
|
❌ Gradle check result for 4d0e177: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
|
❌ Gradle check result for 0fa161c: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
9f3fadc to
673d01d
Compare
|
Persistent review updated to latest commit f62071f |
|
❌ Gradle check result for f62071f: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
f62071f to
c51dfa0
Compare
|
Persistent review updated to latest commit c51dfa0 |
|
❌ Gradle check result for c51dfa0: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
|
❌ Gradle check result for c51dfa0: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
This PR introduces a warmup phase for pull-based ingestion that prevents shards from serving queries until they have caught up with the streaming source. This is analogous to how push-based replication waits for replicas to sync before serving. Key changes: - Add warmup settings: enabled (default: true), timeout, lag_threshold, fail_on_timeout - Create WarmupConfig class to encapsulate warmup configuration - Block shard in postRecovery() until warmup completes or times out - Use CountDownLatch for thread-safe warmup blocking/signaling - Encapsulate timeout and error handling in IngestionEngine.awaitWarmupComplete() New index settings: - index.ingestion_source.warmup.enabled (boolean, default: true) - index.ingestion_source.warmup.timeout (time, default: 5m) - index.ingestion_source.warmup.lag_threshold (long, default: 0) - index.ingestion_source.warmup.fail_on_timeout (boolean, default: false) Resolves opensearch-project#20506 Signed-off-by: Kaustubh Butte <kaustubhbutte17@gmail.com>
…to 100 - Change default lag_threshold from 0 to 100 messages per msfroh's suggestion - Add WarmupConfig.DEFAULT constant for cleaner initialization - Refactor DefaultStreamPoller to accept WarmupConfig object instead of 4 individual parameters - Extract warmup handling in IndexShard to handlePullBasedIngestionWarmup() method - Update tests to use WarmupConfig Signed-off-by: Kaustubh Butte <kaustubhbutte17@gmail.com>
- Add isPaused() check in IngestionEngine.awaitWarmupComplete() to skip waiting - Add paused check in DefaultStreamPoller.checkWarmupStatus() to mark warmup complete Signed-off-by: Kaustubh Butte <kaustubhbutte17@gmail.com>
Signed-off-by: Kaustubh Butte <kaustubhbutte17@gmail.com>
- Add setStateWithWarmupAwareness method to keep WARMING_UP state during warmup - Replace direct state assignments with setStateWithWarmupAwareness calls - Update CHANGELOG with latest from main Signed-off-by: Kaustubh Butte <kaustubhbutte17@gmail.com>
- Add testWarmupPhase IT to verify warmup functionality end-to-end - Fix IngestionSourceTests to expect lagThreshold=100 (new default) Signed-off-by: Kaustubh Butte <kaustubhbutte17@gmail.com>
- Remove WarmupConfig.DEFAULT constant to avoid defaults drifting from IndexMetadata settings (msfroh) - Replace waitForSearchableDocs with direct refresh + assert in warmup IT to catch warmup issues immediately (varunbharadwaj) - Rename checkWarmupStatus() to updateWarmupStatus() for clarity (varunbharadwaj) Signed-off-by: Kaustubh Butte <kaustubhbutte17@gmail.com>
…mpletion Before lag is first fetched from the consumer, cachedPointerBasedLag was 0 which could satisfy lagBelowThreshold check (0 <= threshold) causing warmup to complete before real lag data is available. Using -1 as sentinel ensures the existing 'currentLag >= 0' guard prevents premature completion. Signed-off-by: Kaustubh Butte <kaustubhbutte17@gmail.com>
Tests that don't test warmup behavior need warmup.enabled=false since warmup (enabled by default) blocks shard initialization, causing createIndex assertAcked to fail with 'not all shards were started'. Signed-off-by: Kaustubh Butte <kaustubhbutte17@gmail.com>
- Change @experimentalapi to @publicapi(since = "3.6.0") on WarmupConfig - Remove unused isWarmupComplete() from IngestionEngine - Add comment on cachedPointerBasedLag = -1 explaining sentinel value Signed-off-by: Kaustubh Butte <kaustubhbutte17@gmail.com>
…eanup - Convert WarmupConfig from class to record (andrross) - Change warmup default to enabled=false (varun - new feature, opt-in) - Remove separate warmup convenience methods from IngestionSource, callers now use getWarmupConfig().xyz() directly - Remove unused isWarmupFailOnTimeout() from IngestionEngine - Fix outdated javadoc: cachedPointerBasedLag is -1 not 0 - Fix CHANGELOG to only contain our entry Signed-off-by: Kaustubh Butte <kaustubhbutte17@gmail.com>
Per andrross review feedback: - Remove warmup.enabled setting: use timeout=-1 (disabled) vs timeout>=0 (enabled) - Remove warmup.fail_on_timeout setting: always proceed with warning on timeout to avoid shard init failure loops (livelock risk) - WarmupConfig record now has only 2 fields: timeout and lagThreshold - Default: warmup disabled (timeout=-1), opt-in by setting a positive timeout - Adapt to upstream Indexer refactor (getEngine -> getIndexer/EngineBackedIndexer) Signed-off-by: Kaustubh Butte <kaustubhbutte17@gmail.com>
c51dfa0 to
911ce7f
Compare
|
Persistent review updated to latest commit 911ce7f |
msfroh
left a comment
There was a problem hiding this comment.
Looks good. I agree with @varunbharadwaj that we should probably make these settings (at least the magnitude of lag) dynamic in a subsequent PR.
Thanks, @kaustubhbutte17!
This PR introduces a warmup phase for pull-based ingestion that prevents shards from serving queries until they have caught up with the streaming source. This is analogous to how push-based replication waits for replicas to sync before serving. Key changes: - Add warmup settings: enabled (default: true), timeout, lag_threshold, fail_on_timeout - Create WarmupConfig class to encapsulate warmup configuration - Block shard in postRecovery() until warmup completes or times out - Use CountDownLatch for thread-safe warmup blocking/signaling - Encapsulate timeout and error handling in IngestionEngine.awaitWarmupComplete() --------- Signed-off-by: Kaustubh Butte <kaustubhbutte17@gmail.com> Signed-off-by: kkewwei <kkewwei@163.com>
Description
In pull-based ingestion, when a new node is added or shard relocation happens, consumption starts from the previous checkpoint offset. During this time, the shard serves stale data until the lag is consumed.
This PR introduces a "warmup" phase that prevents shards from serving queries until they have caught up with the streaming source - analogous to how push-based replication waits for replicas to sync before serving.
Resolves #20506
Solution
When warmup is enabled, a shard entering the
postRecoveryphase will block until:This ensures data consistency for users who prefer correctness over immediate availability.
New Settings
index.ingestion_source.warmup.timeout-1= disabled,>=0= enabled with that timeoutindex.ingestion_source.warmup.lag_thresholdAll settings are Final - set at index creation time, cannot be changed later.
State Machine
During WARMING_UP:
WARMING_UPviaGET /{index}/ingestion/_stateINITIALIZING- does NOT serve search queries503 all shards failedUsage Example
Enable warmup with 5 minute timeout:
{ "settings": { "index.ingestion_source.warmup.timeout": "5m" } }Custom warmup with lag threshold:
{ "settings": { "index.ingestion_source.warmup.timeout": "10m", "index.ingestion_source.warmup.lag_threshold": 50 } }Warmup disabled (default):
No settings needed. Default
timeout=-1means warmup is disabled.Implementation Details
WarmupConfigrecord class inIngestionSourcewith two fields:timeoutandlagThresholdtimeout = -1means warmup disabled;timeout >= 0means enabledIndexShard.postRecovery()before transitioning toPOST_RECOVERYstateIngestionEngine.awaitWarmupComplete()CountDownLatchfor thread-safe blocking/signalingcachedPointerBasedLaginitialized to-1to prevent premature warmup completion before lag data is availableGET /{index}/ingestion/_stateAPI (poller_state: WARMING_UP)