Skip to content

Add warmup phase for pull-based ingestion#20526

Merged
msfroh merged 12 commits intoopensearch-project:mainfrom
kaustubhbutte17:feature/warmup-pull-ingestion
Mar 19, 2026
Merged

Add warmup phase for pull-based ingestion#20526
msfroh merged 12 commits intoopensearch-project:mainfrom
kaustubhbutte17:feature/warmup-pull-ingestion

Conversation

@kaustubhbutte17
Copy link
Copy Markdown
Contributor

@kaustubhbutte17 kaustubhbutte17 commented Feb 2, 2026

Description

In pull-based ingestion, when a new node is added or shard relocation happens, consumption starts from the previous checkpoint offset. During this time, the shard serves stale data until the lag is consumed.

This PR introduces a "warmup" phase that prevents shards from serving queries until they have caught up with the streaming source - analogous to how push-based replication waits for replicas to sync before serving.

Resolves #20506

Solution

When warmup is enabled, a shard entering the postRecovery phase will block until:

  1. The pointer-based lag drops to or below the configured threshold, OR
  2. The timeout is reached (proceeds with warning)

This ensures data consistency for users who prefer correctness over immediate availability.

New Settings

Setting Type Default Description
index.ingestion_source.warmup.timeout TimeValue -1 (disabled) Warmup wait time. -1 = disabled, >=0 = enabled with that timeout
index.ingestion_source.warmup.lag_threshold long 100 Acceptable pointer-based message lag for warmup completion

All settings are Final - set at index creation time, cannot be changed later.

State Machine

Index Created -> WARMING_UP -> (lag <= threshold OR timeout) -> POLLING -> STARTED

During WARMING_UP:

  • Poller is actively ingesting from the source (internally POLLING/PROCESSING)
  • But externally reports state as WARMING_UP via GET /{index}/ingestion/_state
  • Shard remains in INITIALIZING - does NOT serve search queries
  • Search returns 503 all shards failed
  • If poller is paused, warmup is skipped immediately
  • On timeout, warmup completes with a warning log (shard proceeds to serve)

Usage Example

Enable warmup with 5 minute timeout:

{
  "settings": {
    "index.ingestion_source.warmup.timeout": "5m"
  }
}

Custom warmup with lag threshold:

{
  "settings": {
    "index.ingestion_source.warmup.timeout": "10m",
    "index.ingestion_source.warmup.lag_threshold": 50
  }
}

Warmup disabled (default):

No settings needed. Default timeout=-1 means warmup is disabled.

Implementation Details

  • Added WarmupConfig record class in IngestionSource with two fields: timeout and lagThreshold
  • timeout = -1 means warmup disabled; timeout >= 0 means enabled
  • Warmup blocking occurs in IndexShard.postRecovery() before transitioning to POST_RECOVERY state
  • Timeout and error handling is encapsulated in IngestionEngine.awaitWarmupComplete()
  • Uses CountDownLatch for thread-safe blocking/signaling
  • cachedPointerBasedLag initialized to -1 to prevent premature warmup completion before lag data is available
  • Warmup state visible via GET /{index}/ingestion/_state API (poller_state: WARMING_UP)
  • When poller is paused during warmup, warmup completes immediately (skipped)
  • On timeout, warmup always proceeds with a warning log (no shard failure to avoid livelock)

@github-actions github-actions bot added enhancement Enhancement or improvement to existing feature or request Indexing Indexing, Bulk Indexing and anything related to indexing labels Feb 2, 2026
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Feb 2, 2026

Important

Review skipped

Auto incremental reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds a warmup phase for pull-based ingestion to wait for lag to catch up before serving. Introduces six new configuration settings in IndexMetadata, propagates them through IngestionSource and IngestionEngine, and implements warmup state tracking and timeout handling in DefaultStreamPoller.

Changes

Cohort / File(s) Summary
Configuration & Metadata
CHANGELOG.md, server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java, server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java
Added six warmup-related configuration settings (enabled, timeout, lag thresholds, time-based vs pointer-based lag, fail-on-timeout) to IndexMetadata constants and registered them in IndexScopedSettings.
Ingestion Source
server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java
Added six warmup configuration fields with corresponding getters and builder setters; updated constructor, equals, hashCode, and toString to include warmup properties.
Engine & Poller Lifecycle
server/src/main/java/org/opensearch/index/engine/IngestionEngine.java, server/src/main/java/org/opensearch/indices/pollingingest/StreamPoller.java, server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java
Propagated warmup configuration to stream poller builder; extended StreamPoller interface with isWarmupComplete(), awaitWarmupComplete(timeoutMs), and new WARMING_UP state; implemented comprehensive warmup logic in DefaultStreamPoller with state tracking, lag checking, timeout handling, and synchronization.
Recovery Bootstrap
server/src/main/java/org/opensearch/index/shard/IndexShard.java
Modified postRecovery to await warmup completion after refresh, with configurable timeout and fail-on-timeout behavior.
Tests
server/src/test/java/org/opensearch/cluster/metadata/IngestionSourceTests.java, server/src/test/java/org/opensearch/indices/pollingingest/DefaultStreamPollerTests.java
Added warmup configuration tests for IngestionSource covering defaults, equality, builder semantics; expanded DefaultStreamPoller tests with warmup parameter propagation and new test cases for warmup behavior including disabled mode, immediate completion, timeout handling, and time-based lag.

Sequence Diagram

sequenceDiagram
    participant Shard as IndexShard
    participant Engine as IngestionEngine
    participant Poller as DefaultStreamPoller
    participant Source as Stream Source
    
    Shard->>Engine: postRecovery()
    Engine->>Engine: refresh()
    Engine->>Poller: isWarmupComplete()?
    Poller->>Source: poll for lag
    alt Warmup Disabled
        Poller-->>Engine: true (warmup complete)
    else Lag Below Threshold
        Poller->>Poller: checkWarmupStatus()
        Poller-->>Engine: true (warmup complete)
    else Timeout Elapsed
        alt failOnTimeout = true
            Poller-->>Engine: throw exception
        else failOnTimeout = false
            Poller-->>Engine: true (proceed with warning)
        end
    end
    Engine->>Shard: warmup done, ready to serve
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Suggested labels

feature

🚥 Pre-merge checks | ✅ 4 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 23.53% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The PR title 'Add warmup phase for pull-based ingestion' clearly summarizes the main change and matches the primary objective of implementing a warmup phase for pull-based ingestion.
Linked Issues check ✅ Passed All code changes implement the requirements from issue #20506: warmup phase blocks shards until lag threshold is reached or timeout expires, supports both pointer-based and time-based lag, and includes configurable timeout and fail-on-timeout behavior.
Out of Scope Changes check ✅ Passed All changes are directly related to implementing the warmup feature: configuration settings added to IndexMetadata, warmup logic integrated into DefaultStreamPoller and IngestionEngine, postRecovery flow updated to wait for warmup, and comprehensive test coverage added.
Description check ✅ Passed The PR description comprehensively addresses the feature, objectives, implementation details, and provides clear usage examples with the new warmup settings.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@kaustubhbutte17 kaustubhbutte17 force-pushed the feature/warmup-pull-ingestion branch from e239448 to 3869bb5 Compare February 2, 2026 16:53
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Fix all issues with AI agents
In `@CHANGELOG.md`:
- Line 28: Update the changelog entry so the link references the correct PR
number: replace the incorrect reference "#20506" with "#20526" in the line
containing "Add warmup phase to wait for lag to catch up in pull-based ingestion
before serving" so the hyperlink points to PR 20526 rather than an issue number.

In `@server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java`:
- Around line 995-1067: getIngestionSource currently reads several ingestion
settings but omits the six warmup settings declared (INGESTION_SOURCE_WARMUP_*)
so index-level warmup config is ignored; in getIngestionSource() read each
warmup setting from settings using INGESTION_SOURCE_WARMUP_ENABLED_SETTING,
INGESTION_SOURCE_WARMUP_TIMEOUT_SETTING,
INGESTION_SOURCE_WARMUP_LAG_THRESHOLD_SETTING,
INGESTION_SOURCE_WARMUP_USE_TIME_LAG_SETTING,
INGESTION_SOURCE_WARMUP_TIME_LAG_THRESHOLD_SETTING, and
INGESTION_SOURCE_WARMUP_FAIL_ON_TIMEOUT_SETTING and pass them into the
IngestionSource.Builder chain by calling the corresponding builder setters (e.g.
setWarmupEnabled, setWarmupTimeout, setWarmupLagThreshold, setWarmupUseTimeLag,
setWarmupTimeLagThreshold, setWarmupFailOnTimeout) so the builder receives the
index-level values.

In
`@server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java`:
- Around line 450-457: Add a log statement when the code falls back to
time-based lag in DefaultStreamPoller: inside the branch where (warmupUseTimeLag
|| cachedPointerBasedLag < 0) and before calling computeTimeBasedLag(), log that
the poller is using time-based lag (include values of cachedPointerBasedLag and
warmupTimeLagThreshold) so operators can see the active lag metric; reference
the variables/methods: warmupUseTimeLag, cachedPointerBasedLag,
computeTimeBasedLag(), warmupTimeLagThreshold, and the enclosing
DefaultStreamPoller class when adding the log.
🧹 Nitpick comments (4)
server/src/test/java/org/opensearch/indices/pollingingest/DefaultStreamPollerTests.java (2)

909-911: Unused variable completed — consider asserting on return value or removing.

The return value of awaitWarmupComplete(2000) is stored but not used in the assertion. Either assert on completed directly, or remove the variable.

♻️ Suggested fix
-        boolean completed = warmupPoller.awaitWarmupComplete(2000);
-        // Even if lag never reaches threshold, warmup completes on timeout
-        assertTrue("Warmup should complete after timeout", warmupPoller.isWarmupComplete());
+        warmupPoller.awaitWarmupComplete(2000);
+        // Even if lag never reaches threshold, warmup completes on timeout
+        assertTrue("Warmup should complete after timeout", warmupPoller.isWarmupComplete());

Or alternatively, assert on the return value:

-        boolean completed = warmupPoller.awaitWarmupComplete(2000);
-        // Even if lag never reaches threshold, warmup completes on timeout
-        assertTrue("Warmup should complete after timeout", warmupPoller.isWarmupComplete());
+        // awaitWarmupComplete may return false due to its own timeout, 
+        // but warmup should be complete due to internal timeout
+        warmupPoller.awaitWarmupComplete(2000);
+        assertTrue("Warmup should complete after internal timeout", warmupPoller.isWarmupComplete());

946-947: Thread.sleep(200) may cause test flakiness.

Fixed sleep times in tests can lead to flaky behavior in CI environments with varying load. Consider using assertBusy() or waitUntil() as used elsewhere in this test class.

♻️ Suggested fix
         warmupPoller.start();
 
-        // Give it time to start
-        Thread.sleep(200);
+        // Wait for warmup to start
+        waitUntil(() -> warmupPoller.getState() != StreamPoller.State.NONE, awaitTime, TimeUnit.MILLISECONDS);
server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java (1)

182-187: Minor: Objects.equals() on primitives causes autoboxing.

Using Objects.equals() on primitive types (boolean, long) causes unnecessary autoboxing. While functionally correct, using == for primitive comparisons is more efficient.

♻️ Suggested fix
             && Objects.equals(mapperType, ingestionSource.mapperType)
-            && Objects.equals(warmupEnabled, ingestionSource.warmupEnabled)
+            && warmupEnabled == ingestionSource.warmupEnabled
             && Objects.equals(warmupTimeout, ingestionSource.warmupTimeout)
-            && Objects.equals(warmupLagThreshold, ingestionSource.warmupLagThreshold)
-            && Objects.equals(warmupUseTimeLag, ingestionSource.warmupUseTimeLag)
-            && Objects.equals(warmupTimeLagThreshold, ingestionSource.warmupTimeLagThreshold)
-            && Objects.equals(warmupFailOnTimeout, ingestionSource.warmupFailOnTimeout);
+            && warmupLagThreshold == ingestionSource.warmupLagThreshold
+            && warmupUseTimeLag == ingestionSource.warmupUseTimeLag
+            && warmupTimeLagThreshold == ingestionSource.warmupTimeLagThreshold
+            && warmupFailOnTimeout == ingestionSource.warmupFailOnTimeout;
server/src/main/java/org/opensearch/index/engine/IngestionEngine.java (1)

684-689: Add isWarmupFailOnTimeout() to the StreamPoller interface for API consistency.

The instanceof check and cast to DefaultStreamPoller creates an inconsistent API surface. The other warmup methods (isWarmupComplete(), awaitWarmupComplete()) are declared in the StreamPoller interface, but isWarmupFailOnTimeout() requires knowledge of the concrete implementation.

♻️ Suggested approach

Add the method to the StreamPoller interface:

// In StreamPoller.java
boolean isWarmupFailOnTimeout();

Then in IngestionEngine.java:

 public boolean isWarmupFailOnTimeout() {
-    if (streamPoller instanceof DefaultStreamPoller) {
-        return ((DefaultStreamPoller) streamPoller).isWarmupFailOnTimeout();
-    }
-    return false;
+    return streamPoller.isWarmupFailOnTimeout();
 }
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e8b9f1a and e239448.

📒 Files selected for processing (10)
  • CHANGELOG.md
  • server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java
  • server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java
  • server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java
  • server/src/main/java/org/opensearch/index/engine/IngestionEngine.java
  • server/src/main/java/org/opensearch/index/shard/IndexShard.java
  • server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java
  • server/src/main/java/org/opensearch/indices/pollingingest/StreamPoller.java
  • server/src/test/java/org/opensearch/cluster/metadata/IngestionSourceTests.java
  • server/src/test/java/org/opensearch/indices/pollingingest/DefaultStreamPollerTests.java
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2026-01-13T17:40:27.167Z
Learnt from: reta
Repo: opensearch-project/OpenSearch PR: 20411
File: server/src/main/java/org/opensearch/index/codec/CodecService.java:112-133
Timestamp: 2026-01-13T17:40:27.167Z
Learning: Avoid capturing or evaluating a supplier (e.g., this::defaultCodec) upfront when passing it to a registry during object construction. If registries may replace defaults during iteration (as in EnginePlugin.getAdditionalCodecs), pass the supplier itself and only resolve it at use time. This ensures dynamic behavior is preserved during initialization and prevents premature binding of defaults in codecs/registry setup. This pattern should apply to similar initialization paths in Java server code where registries may mutate defaults during construction.

Applied to files:

  • server/src/main/java/org/opensearch/index/engine/IngestionEngine.java
  • server/src/main/java/org/opensearch/index/shard/IndexShard.java
  • server/src/main/java/org/opensearch/indices/pollingingest/StreamPoller.java
  • server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java
  • server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java
  • server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java
  • server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: gradle-check
🔇 Additional comments (15)
server/src/main/java/org/opensearch/indices/pollingingest/StreamPoller.java (2)

86-97: No changes needed. The warmup methods are properly implemented in the only StreamPoller implementation (DefaultStreamPoller), and the interface is marked @ExperimentalApi, where breaking changes are expected and documented. The suggested default methods returning true would incorrectly bypass the warmup synchronization logic and defeat the feature's purpose.

Likely an incorrect or invalid review comment.


102-109: State handling for WARMING_UP is complete—no action needed.

The codebase does not use switch statements on StreamPoller.State that would require explicit WARMING_UP handling. Instead, state transitions are managed via conditional assignments: WARMING_UP is set when warmup is enabled (line 237), and subsequent states (CLOSED, PAUSED, POLLING, PROCESSING) are assigned based on runtime conditions in the main loop. The only state comparison (while (state != State.CLOSED)) doesn't require a WARMING_UP case. The state machine design is inherently safe and avoids fall-through to default/illegal paths.

Likely an incorrect or invalid review comment.

server/src/main/java/org/opensearch/index/shard/IndexShard.java (1)

2476-2499: Confirm warmup wait exits promptly on shard close.
The warmup wait runs while holding postRecoveryMutex. If the shard closes while waiting, this could hold the mutex until timeout and delay shutdown unless awaitWarmupComplete aborts on engine close/interrupt. Please confirm it does, or add a closed-state/cancellation check to avoid long waits during shutdown.

server/src/test/java/org/opensearch/cluster/metadata/IngestionSourceTests.java (2)

110-112: toString expectation updated appropriately.
This keeps the test aligned with the new warmup fields.


141-222: Good warmup configuration coverage.
These tests nicely cover defaults, equality, and copy semantics for the new warmup fields.

server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java (1)

292-297: Warmup settings registered correctly.
Adding these to the built-in index settings set is the right place to ensure validation and visibility.

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java (1)

150-156: LGTM!

Warmup configuration is cleanly propagated from IngestionSource to the stream poller builder.

server/src/test/java/org/opensearch/indices/pollingingest/DefaultStreamPollerTests.java (2)

98-105: LGTM!

Correctly disabling warmup for existing tests preserves backward compatibility and test stability.


801-833: LGTM — Good coverage of warmup scenarios.

The new warmup tests cover:

  • Disabled warmup behavior
  • Immediate completion when lag threshold is met
  • Timeout behavior
  • Time-based lag configuration
  • isWarmupFailOnTimeout flag

Also applies to: 835-873, 956-989, 991-1045

server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java (2)

52-58: LGTM!

Warmup configuration fields, constructor assignments, and getters are cleanly implemented with appropriate encapsulation.

Also applies to: 90-96, 142-164


316-322: LGTM!

Builder warmup fields with defaults, copy constructor propagation, setters, and build() method are correctly implemented.

Also applies to: 338-343, 401-429, 443-449

server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java (4)

60-69: LGTM!

Appropriate use of volatile for fields accessed across threads (warmupComplete, warmupStartTime) and a dedicated lock object for wait/notify coordination.

Also applies to: 192-204


234-239: LGTM!

Warmup initialization correctly sets the start time and transitions to WARMING_UP state before the polling loop begins.


419-440: LGTM!

The awaitWarmupComplete() implementation correctly handles the wait/notify pattern with timeout. The early return check outside the synchronized block using volatile read is an appropriate optimization, and the synchronized block properly handles races.


741-747: LGTM!

Builder defaults match the IndexMetadata settings as documented, and all warmup setters follow the fluent builder pattern consistently.

Also applies to: 847-921

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Feb 2, 2026

❌ Gradle check result for 3869bb5: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Feb 2, 2026

❌ Gradle check result for 4d0e177: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@varunbharadwaj varunbharadwaj marked this pull request as draft February 3, 2026 06:10
@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Feb 3, 2026

❌ Gradle check result for 0fa161c: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@kaustubhbutte17 kaustubhbutte17 force-pushed the feature/warmup-pull-ingestion branch from 9f3fadc to 673d01d Compare February 3, 2026 09:26
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit f62071f

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for f62071f: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit c51dfa0

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for c51dfa0: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for c51dfa0: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

This PR introduces a warmup phase for pull-based ingestion that prevents
shards from serving queries until they have caught up with the streaming
source. This is analogous to how push-based replication waits for replicas
to sync before serving.

Key changes:
- Add warmup settings: enabled (default: true), timeout, lag_threshold, fail_on_timeout
- Create WarmupConfig class to encapsulate warmup configuration
- Block shard in postRecovery() until warmup completes or times out
- Use CountDownLatch for thread-safe warmup blocking/signaling
- Encapsulate timeout and error handling in IngestionEngine.awaitWarmupComplete()

New index settings:
- index.ingestion_source.warmup.enabled (boolean, default: true)
- index.ingestion_source.warmup.timeout (time, default: 5m)
- index.ingestion_source.warmup.lag_threshold (long, default: 0)
- index.ingestion_source.warmup.fail_on_timeout (boolean, default: false)

Resolves opensearch-project#20506

Signed-off-by: Kaustubh Butte <kaustubhbutte17@gmail.com>
…to 100

- Change default lag_threshold from 0 to 100 messages per msfroh's suggestion
- Add WarmupConfig.DEFAULT constant for cleaner initialization
- Refactor DefaultStreamPoller to accept WarmupConfig object instead of 4 individual parameters
- Extract warmup handling in IndexShard to handlePullBasedIngestionWarmup() method
- Update tests to use WarmupConfig

Signed-off-by: Kaustubh Butte <kaustubhbutte17@gmail.com>
- Add isPaused() check in IngestionEngine.awaitWarmupComplete() to skip waiting
- Add paused check in DefaultStreamPoller.checkWarmupStatus() to mark warmup complete

Signed-off-by: Kaustubh Butte <kaustubhbutte17@gmail.com>
Signed-off-by: Kaustubh Butte <kaustubhbutte17@gmail.com>
- Add setStateWithWarmupAwareness method to keep WARMING_UP state during warmup
- Replace direct state assignments with setStateWithWarmupAwareness calls
- Update CHANGELOG with latest from main

Signed-off-by: Kaustubh Butte <kaustubhbutte17@gmail.com>
- Add testWarmupPhase IT to verify warmup functionality end-to-end
- Fix IngestionSourceTests to expect lagThreshold=100 (new default)

Signed-off-by: Kaustubh Butte <kaustubhbutte17@gmail.com>
- Remove WarmupConfig.DEFAULT constant to avoid defaults drifting from
  IndexMetadata settings (msfroh)
- Replace waitForSearchableDocs with direct refresh + assert in warmup IT
  to catch warmup issues immediately (varunbharadwaj)
- Rename checkWarmupStatus() to updateWarmupStatus() for clarity
  (varunbharadwaj)

Signed-off-by: Kaustubh Butte <kaustubhbutte17@gmail.com>
…mpletion

Before lag is first fetched from the consumer, cachedPointerBasedLag was 0 which
could satisfy lagBelowThreshold check (0 <= threshold) causing warmup to complete
before real lag data is available. Using -1 as sentinel ensures the existing
'currentLag >= 0' guard prevents premature completion.

Signed-off-by: Kaustubh Butte <kaustubhbutte17@gmail.com>
Tests that don't test warmup behavior need warmup.enabled=false since
warmup (enabled by default) blocks shard initialization, causing
createIndex assertAcked to fail with 'not all shards were started'.

Signed-off-by: Kaustubh Butte <kaustubhbutte17@gmail.com>
- Change @experimentalapi to @publicapi(since = "3.6.0") on WarmupConfig
- Remove unused isWarmupComplete() from IngestionEngine
- Add comment on cachedPointerBasedLag = -1 explaining sentinel value

Signed-off-by: Kaustubh Butte <kaustubhbutte17@gmail.com>
…eanup

- Convert WarmupConfig from class to record (andrross)
- Change warmup default to enabled=false (varun - new feature, opt-in)
- Remove separate warmup convenience methods from IngestionSource,
  callers now use getWarmupConfig().xyz() directly
- Remove unused isWarmupFailOnTimeout() from IngestionEngine
- Fix outdated javadoc: cachedPointerBasedLag is -1 not 0
- Fix CHANGELOG to only contain our entry

Signed-off-by: Kaustubh Butte <kaustubhbutte17@gmail.com>
Per andrross review feedback:
- Remove warmup.enabled setting: use timeout=-1 (disabled) vs timeout>=0 (enabled)
- Remove warmup.fail_on_timeout setting: always proceed with warning on timeout
  to avoid shard init failure loops (livelock risk)
- WarmupConfig record now has only 2 fields: timeout and lagThreshold
- Default: warmup disabled (timeout=-1), opt-in by setting a positive timeout
- Adapt to upstream Indexer refactor (getEngine -> getIndexer/EngineBackedIndexer)

Signed-off-by: Kaustubh Butte <kaustubhbutte17@gmail.com>
@kaustubhbutte17 kaustubhbutte17 force-pushed the feature/warmup-pull-ingestion branch from c51dfa0 to 911ce7f Compare March 18, 2026 19:51
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 911ce7f

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for 911ce7f: SUCCESS

Copy link
Copy Markdown
Contributor

@msfroh msfroh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. I agree with @varunbharadwaj that we should probably make these settings (at least the magnitude of lag) dynamic in a subsequent PR.

Thanks, @kaustubhbutte17!

@msfroh msfroh merged commit e67f291 into opensearch-project:main Mar 19, 2026
36 checks passed
kkewwei pushed a commit to kkewwei/OpenSearch that referenced this pull request Mar 20, 2026
This PR introduces a warmup phase for pull-based ingestion that prevents
shards from serving queries until they have caught up with the streaming
source. This is analogous to how push-based replication waits for replicas
to sync before serving.

Key changes:
- Add warmup settings: enabled (default: true), timeout, lag_threshold, fail_on_timeout
- Create WarmupConfig class to encapsulate warmup configuration
- Block shard in postRecovery() until warmup completes or times out
- Use CountDownLatch for thread-safe warmup blocking/signaling
- Encapsulate timeout and error handling in IngestionEngine.awaitWarmupComplete()

---------

Signed-off-by: Kaustubh Butte <kaustubhbutte17@gmail.com>
Signed-off-by: kkewwei <kkewwei@163.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement Enhancement or improvement to existing feature or request Indexing Indexing, Bulk Indexing and anything related to indexing

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature Request] Wait for lag to catch up in pull-based ingestion

4 participants