feat(log-ingestor)!: Switch buffer submission from soft timeout to hard timeout; Move buffer config from package config to job config (resolves #2136).#2150
Conversation
WalkthroughGlobal log-ingestor buffer/channel settings were removed and relocated to a per-job BufferConfig. Buffer API now signals when a submission is triggered. Listener timer semantics changed from soft (resetting) to hard (not reset on adds). Tests and deployment templates updated accordingly. Changes
Sequence Diagram(s)mermaid Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
components/log-ingestor/src/compression/listener.rs (1)
53-95:⚠️ Potential issue | 🟠 MajorAdd
biased;to theselect!macro to enforce timer priority when entries are under timeout.The
select!macro withoutbiased;polls branches in random order. When the timer is due and the receiver has data ready, the receiver may be polled first, allowing new entries to be consumed before the timeout branch executes. This violates the documented guarantee at lines 31–33 that "the oldest buffered entry has remained in the buffer for at least the configured timeout duration." Under sustained ingestion, this delays flushing past the deadline. Usebiased;at line 54 to poll branches top-to-bottom, or reorder branches to place the timer first when strict timeout enforcement is required.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@components/log-ingestor/src/compression/listener.rs` around lines 53 - 95, The select! invocation in the listener loop should be made biased so the timer branch gets top-to-bottom priority and enforces the timeout guarantee: add the token "biased;" as the first item inside the select! block (above the cancel_token, receiver, and timer branches) so when the timer is ready it will be polled before the receiver branch; keep the existing logic using timer, timer_active, buffer, receiver, cancel_token, timeout and Instant::now() unchanged so the timer reset/submit behavior remains intact.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@components/clp-rust-utils/src/job_config/ingestion.rs`:
- Around line 69-71: BufferConfig currently prevents partial per-job overrides
because its fields aren't using serde defaults; update the type so
deserialisation can omit individual fields: either add #[serde(default)] to the
BufferConfig struct declaration so the existing Default impl is used for missing
fields, or add #[serde(default)] to each BufferConfig field (e.g.,
flush_threshold_bytes, flush_interval_ms, max_buffer_bytes) so per-job overrides
like buffer_config.flush_threshold_bytes deserialize correctly; ensure
BufferConfig and the buffer_config field remain compatible with the existing
Default impl.
In `@components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs`:
- Around line 417-429: Recovered jobs are now using base_config.buffer_config
unconditionally which causes legacy rows (that never stored buffer_config) to
get BufferConfig::default() and change behavior; update the recovery path in
clp_ingestion.rs so when constructing the Listener (the Buffer::new call and the
Duration/ channel_capacity args used in Listener::spawn) you detect
missing/legacy buffer settings on the recovered job and fall back to the
package/deployment-level buffering values (the previous global/defaults) instead
of base_config.buffer_config defaults—use the job record’s metadata or a
compatibility helper to return the original package-level buffer values and pass
those into Buffer::new and Listener::spawn, preserving the previous buffering
behavior for legacy CompressionJobSubmitter-created jobs.
In `@components/log-ingestor/tests/test_compression_listener.rs`:
- Around line 77-89: The test is timing the sender rather than the listener
(timestamps like t_2, t_4, t_7 are taken after send(...).await), making
assertions flaky; update the tests (including test_compression_listener and
calls around assert_submission_count_at) to either use a deterministic Tokio
test clock (tokio::time::pause()/advance()) so the listener's hard-timeout is
driven deterministically, or capture the instants before awaiting send so the
timeout window matches when the listener arms it; alternatively increase the
timing margins (use a much larger slack than 2ms or replace
DEFAULT_TIMEOUT_SECONDS with a larger test-specific value) and apply the same
change to the other occurrences called out (lines around 118-119, 160-183,
193-218, 239-257) so assertions reflect the listener's deadline rather than the
sender scheduling.
---
Outside diff comments:
In `@components/log-ingestor/src/compression/listener.rs`:
- Around line 53-95: The select! invocation in the listener loop should be made
biased so the timer branch gets top-to-bottom priority and enforces the timeout
guarantee: add the token "biased;" as the first item inside the select! block
(above the cancel_token, receiver, and timer branches) so when the timer is
ready it will be polled before the receiver branch; keep the existing logic
using timer, timer_active, buffer, receiver, cancel_token, timeout and
Instant::now() unchanged so the timer reset/submit behavior remains intact.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: f7b1df51-4adf-431e-b968-66b05152304a
⛔ Files ignored due to path filters (1)
docs/src/_static/generated/log-ingestor-openapi.jsonis excluded by!**/generated/**
📒 Files selected for processing (12)
components/clp-py-utils/clp_py_utils/clp_config.pycomponents/clp-rust-utils/src/clp_config/package/config.rscomponents/clp-rust-utils/src/job_config/ingestion.rscomponents/log-ingestor/src/compression/buffer.rscomponents/log-ingestor/src/compression/listener.rscomponents/log-ingestor/src/ingestion_job_manager/clp_ingestion.rscomponents/log-ingestor/tests/test_compression_listener.rscomponents/log-ingestor/tests/test_ingestion_job.rscomponents/package-template/src/etc/clp-config.template.json.yamltools/deployment/package-helm/Chart.yamltools/deployment/package-helm/templates/configmap.yamltools/deployment/package-helm/values.yaml
💤 Files with no reviewable changes (5)
- components/package-template/src/etc/clp-config.template.json.yaml
- tools/deployment/package-helm/templates/configmap.yaml
- components/clp-rust-utils/src/clp_config/package/config.rs
- tools/deployment/package-helm/values.yaml
- components/clp-py-utils/clp_py_utils/clp_config.py
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@components/log-ingestor/tests/test_compression_listener.rs`:
- Around line 239-256: The assertion context strings are misleading: update the
messages passed to assert_submission_count_at for the two timeout checks to
reference the correct timestamps and phrasing (use t_5 and t_7 with
TIMEOUT/SLACK) and remove incorrect references to "t_10" and numbered "step"
labels; specifically edit the calls to assert_submission_count_at (the two uses
after the comment blocks) so their descriptive strings say something like
"phase: timer should not have fired at t_5 + TIMEOUT" and "phase: timeout flush
of [124] at t_7 + TIMEOUT" or similar clear phrasing that mentions t_5/t_7,
TIMEOUT/SLACK, and avoids step numbers.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 845bbdb4-c61c-43cf-8ff8-2f44ef4e1ddb
📒 Files selected for processing (1)
components/log-ingestor/tests/test_compression_listener.rs
hoophalab
left a comment
There was a problem hiding this comment.
LGTM. One question.
Validations:
- Listen to an SQS queue and set the timeout to 5 seconds.
- Upload a small file. ~5 seconds later, a compression job is created.
- Continuously upload small files. The first compression job is created ~5 seconds after the first file is uploaded.
- Stop uploading files. The last file is eventually compressed in all my tests.
| pub struct BufferConfig { | ||
| /// Size-based flush threshold in bytes. | ||
| /// | ||
| /// The buffer is flushed to create a compression job once the total size of buffered | ||
| /// objects exceeds this threshold. | ||
| /// | ||
| /// Defaults to 4 GiB. | ||
| pub flush_threshold_bytes: u64, | ||
|
|
||
| /// Time-based flush threshold in seconds. | ||
| /// | ||
| /// This is a hard timeout. The buffer is flushed to create a compression job when the | ||
| /// oldest buffered object has remained in the buffer for at least this duration, | ||
| /// regardless of the total buffered size. The timer is not reset by newly ingested | ||
| /// objects. | ||
| /// | ||
| /// Defaults to 300 seconds (5 minutes). | ||
| pub timeout_sec: u64, |
There was a problem hiding this comment.
Just to confirm this is the intended behavior:
without #[serde(default = "...")] on each field, the user must either omit buffer_config entirely or specify all three fields. {"buffer_config": {"timeout_sec": 100}} would be invalid.
There was a problem hiding this comment.
Fixed by adding #[serde(default)] to BufferConfig. Added unit tests to make sure it works. Also tested myself with a partially overwritten buffer config when submitting ingestion jobs.
|
|
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@components/clp-rust-utils/src/job_config/ingestion.rs`:
- Around line 167-197: Add OpenAPI schema constraints to BufferConfig by
annotating the fields (flush_threshold_bytes, timeout_sec, channel_capacity)
with #[schema(...)] attributes reflecting valid ranges (e.g., minimum values:
flush_threshold_bytes >= 1, timeout_sec >= 1, channel_capacity >= 1; optionally
set sensible maximums) to match other configs; if zero values must be rejected
at runtime, implement a ValidatedBufferConfig wrapper (analogous to
ValidatedSqsListenerConfig) that validates and converts from BufferConfig,
returning an error for channel_capacity == 0 or timeout_sec == 0 and documenting
defaults (e.g., 4 GiB and 300s) in the schema annotations.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 6dcf39bc-39f1-481c-85c8-9a1313cc46f0
📒 Files selected for processing (1)
components/clp-rust-utils/src/job_config/ingestion.rs
| /// Configuration for buffer behavior. | ||
| #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] | ||
| #[serde(default)] | ||
| pub struct BufferConfig { | ||
| /// Size-based flush threshold in bytes. | ||
| /// | ||
| /// The buffer is flushed to create a compression job once the total size of buffered | ||
| /// objects exceeds this threshold. | ||
| /// | ||
| /// Defaults to 4 GiB. | ||
| pub flush_threshold_bytes: u64, | ||
|
|
||
| /// Time-based flush threshold in seconds. | ||
| /// | ||
| /// This is a hard timeout. The buffer is flushed to create a compression job when the | ||
| /// oldest buffered object has remained in the buffer for at least this duration, | ||
| /// regardless of the total buffered size. The timer is not reset by newly ingested | ||
| /// objects. | ||
| /// | ||
| /// Defaults to 300 seconds (5 minutes). | ||
| pub timeout_sec: u64, | ||
|
|
||
| /// Capacity of the internal buffer channel. | ||
| /// | ||
| /// Defines the maximum number of objects that can be queued before being processed by the | ||
| /// buffer. Increasing this value may improve throughput at the cost of higher memory | ||
| /// usage. | ||
| /// | ||
| /// Defaults to 16. | ||
| pub channel_capacity: usize, | ||
| } |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Consider adding #[schema(...)] annotations for OpenAPI consistency.
Other config fields in this file (e.g., num_concurrent_listener_tasks, wait_time_sec) include #[schema(minimum, maximum)] constraints for API documentation. Adding similar annotations to BufferConfig fields would improve documentation consistency and help API consumers understand valid ranges.
Additionally, consider whether validation is needed for edge cases:
channel_capacity: 0creates a Tokio rendezvous channel wheresendblocks until a receiver is ready, which could cause unexpected behavior.timeout_sec: 0would trigger immediate timeout flushes.
If these edge cases should be disallowed, a ValidatedBufferConfig wrapper (similar to ValidatedSqsListenerConfig) could enforce constraints.
📝 Example schema annotations
pub struct BufferConfig {
+ #[schema(minimum = 1)]
pub flush_threshold_bytes: u64,
+ #[schema(minimum = 1)]
pub timeout_sec: u64,
+ #[schema(minimum = 1)]
pub channel_capacity: usize,
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| /// Configuration for buffer behavior. | |
| #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] | |
| #[serde(default)] | |
| pub struct BufferConfig { | |
| /// Size-based flush threshold in bytes. | |
| /// | |
| /// The buffer is flushed to create a compression job once the total size of buffered | |
| /// objects exceeds this threshold. | |
| /// | |
| /// Defaults to 4 GiB. | |
| pub flush_threshold_bytes: u64, | |
| /// Time-based flush threshold in seconds. | |
| /// | |
| /// This is a hard timeout. The buffer is flushed to create a compression job when the | |
| /// oldest buffered object has remained in the buffer for at least this duration, | |
| /// regardless of the total buffered size. The timer is not reset by newly ingested | |
| /// objects. | |
| /// | |
| /// Defaults to 300 seconds (5 minutes). | |
| pub timeout_sec: u64, | |
| /// Capacity of the internal buffer channel. | |
| /// | |
| /// Defines the maximum number of objects that can be queued before being processed by the | |
| /// buffer. Increasing this value may improve throughput at the cost of higher memory | |
| /// usage. | |
| /// | |
| /// Defaults to 16. | |
| pub channel_capacity: usize, | |
| } | |
| /// Configuration for buffer behavior. | |
| #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] | |
| #[serde(default)] | |
| pub struct BufferConfig { | |
| /// Size-based flush threshold in bytes. | |
| /// | |
| /// The buffer is flushed to create a compression job once the total size of buffered | |
| /// objects exceeds this threshold. | |
| /// | |
| /// Defaults to 4 GiB. | |
| #[schema(minimum = 1)] | |
| pub flush_threshold_bytes: u64, | |
| /// Time-based flush threshold in seconds. | |
| /// | |
| /// This is a hard timeout. The buffer is flushed to create a compression job when the | |
| /// oldest buffered object has remained in the buffer for at least this duration, | |
| /// regardless of the total buffered size. The timer is not reset by newly ingested | |
| /// objects. | |
| /// | |
| /// Defaults to 300 seconds (5 minutes). | |
| #[schema(minimum = 1)] | |
| pub timeout_sec: u64, | |
| /// Capacity of the internal buffer channel. | |
| /// | |
| /// Defines the maximum number of objects that can be queued before being processed by the | |
| /// buffer. Increasing this value may improve throughput at the cost of higher memory | |
| /// usage. | |
| /// | |
| /// Defaults to 16. | |
| #[schema(minimum = 1)] | |
| pub channel_capacity: usize, | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@components/clp-rust-utils/src/job_config/ingestion.rs` around lines 167 -
197, Add OpenAPI schema constraints to BufferConfig by annotating the fields
(flush_threshold_bytes, timeout_sec, channel_capacity) with #[schema(...)]
attributes reflecting valid ranges (e.g., minimum values: flush_threshold_bytes
>= 1, timeout_sec >= 1, channel_capacity >= 1; optionally set sensible maximums)
to match other configs; if zero values must be rejected at runtime, implement a
ValidatedBufferConfig wrapper (analogous to ValidatedSqsListenerConfig) that
validates and converts from BufferConfig, returning an error for
channel_capacity == 0 or timeout_sec == 0 and documenting defaults (e.g., 4 GiB
and 300s) in the schema annotations.
|
Caution Review failedAn error occurred during the review process. Please try again later. WalkthroughRemoved global buffer/channel tuning from LogIngestor and moved buffer settings to per-job BufferConfig. Listener timeout semantics changed from soft (resetting) to hard timeout with conditional timer activation. Buffer API adjusted to signal submissions. Tests and deployment templates updated accordingly. Changes
Sequence Diagram(s)mermaid Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
LinZhihao-723
left a comment
There was a problem hiding this comment.
Found an issue: if channel_capacity is 0, the channel creation will panic and thus kill the coroutine. In the current flow, we need to create the job entry in the ingestion job table before we create the channel. That job will be silently failed. And even worse, if log-ingestor restarts, it will fail forever because the panic will kill the restart coroutine.
To address this, we need to add a check to make sure the channel capacity is a positive integer. In the latest push, this is a solved problem.
The only issue left is: if channel size 0 is given, the user response would be "internal error" becuase we use anyhow result that can't differentiate it from real internal errors from other config errors. Two solutions:
- In
clp_ingestion, we return a thiserror instead of usinganyhow::Result. - We pre-validate the buffer config as we did for the SQS listener config.
However, either way seems out of the scope for this PR. In the OpenAPI doc, we added a notice that a non-positive capacity will result in undefined behavior. That is probably enough for the current PR and release cycle, and the improvements should be tracked in an issue and planned for a future release.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@components/log-ingestor/tests/test_compression_listener.rs`:
- Around line 196-199: The test currently relies on a fragile timing heuristic
using tokio::time::sleep(SLACK).await before asserting shared.lock().await.len()
== 2; replace this with a deterministic approach by calling tokio::time::pause()
at the start of the test and then using tokio::time::advance(SLACK).await before
the assertion (or, if you prefer a simpler change, increase the SLACK constant
value to give more margin); update the equivalent occurrences around the second
assertion block (the region noted at lines 225–233) so both places use the same
deterministic time-control or larger SLACK to avoid CI flakiness.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: b48ae7e6-25b2-4c39-aa21-08601305d990
⛔ Files ignored due to path filters (1)
docs/src/_static/generated/log-ingestor-openapi.jsonis excluded by!**/generated/**
📒 Files selected for processing (4)
components/clp-rust-utils/src/job_config/ingestion.rscomponents/log-ingestor/src/compression/listener.rscomponents/log-ingestor/src/ingestion_job_manager/clp_ingestion.rscomponents/log-ingestor/tests/test_compression_listener.rs
…rd timeout; Move buffer config from package config to job config (resolves y-scope#2136). (y-scope#2150)
Description
This PR resolves #2136.
This PR changes the compression listener's timeout behavior from soft to hard timeout, and moves buffer configuration from the per-package
LogIngestorconfig to the per-jobBaseConfig.Hard timeout
Previously, the listener reset its timer on every
select!loop iteration (soft timeout), meaning the buffer would only flush after a full timeout period of inactivity. Now, the timer starts when the first entry is added to an empty buffer and is not reset by subsequent entries. This guarantees that the oldest buffered object is submitted within the configured timeout duration, regardless of ingestion rate.Key implementation changes:
Buffer::addnow returns whether a size-threshold flush was triggered.Buffer::is_emptyis added to track buffer state transitions.ListenerTask::runuses atimer_activeflag and only resets the timer when the buffer transitions from empty to non-empty, or after a size-threshold flush leaves the buffer non-empty.Per-job buffer config
Buffer settings (
flush_threshold_bytes,timeout_sec,channel_capacity) are moved from the package-levelLogIngestorconfig into a newBufferConfigstruct embedded inBaseConfig(per-job). This allows different ingestion jobs to use different buffer configurations.The following per-package config fields are removed:
buffer_flush_timeout/buffer_flush_timeout_secbuffer_flush_thresholdchannel_capacityThese are removed from the Rust
LogIngestorstruct, the PythonLogIngestorclass, theClpDbIngestionConnectorstruct, the config template, and the Helm chart values/configmap.Checklist
Validation performed
test_compression_listener.rs.components/andtools/.Summary by CodeRabbit
Configuration Updates
Behavior Changes
Tests
Chores