Skip to content

feat(log-ingestor)!: Switch buffer submission from soft timeout to hard timeout; Move buffer config from package config to job config (resolves #2136).#2150

Merged
LinZhihao-723 merged 7 commits into
y-scope:mainfrom
LinZhihao-723:resolve-2136
Apr 1, 2026

Conversation

@LinZhihao-723

@LinZhihao-723 LinZhihao-723 commented Mar 31, 2026

Copy link
Copy Markdown
Member

Description

This PR resolves #2136.

This PR changes the compression listener's timeout behavior from soft to hard timeout, and moves buffer configuration from the per-package LogIngestor config to the per-job BaseConfig.

Hard timeout

Previously, the listener reset its timer on every select! loop iteration (soft timeout), meaning the buffer would only flush after a full timeout period of inactivity. Now, the timer starts when the first entry is added to an empty buffer and is not reset by subsequent entries. This guarantees that the oldest buffered object is submitted within the configured timeout duration, regardless of ingestion rate.

Key implementation changes:

  • Buffer::add now returns whether a size-threshold flush was triggered.
  • Buffer::is_empty is added to track buffer state transitions.
  • ListenerTask::run uses a timer_active flag and only resets the timer when the buffer transitions from empty to non-empty, or after a size-threshold flush leaves the buffer non-empty.

Per-job buffer config

Buffer settings (flush_threshold_bytes, timeout_sec, channel_capacity) are moved from the package-level LogIngestor config into a new BufferConfig struct embedded in BaseConfig (per-job). This allows different ingestion jobs to use different buffer configurations.

The following per-package config fields are removed:

  • buffer_flush_timeout / buffer_flush_timeout_sec
  • buffer_flush_threshold
  • channel_capacity

These are removed from the Rust LogIngestor struct, the Python LogIngestor class, the ClpDbIngestionConnector struct, the config template, and the Helm chart values/configmap.

Checklist

  • The PR satisfies the contribution guidelines.
  • This is a breaking change and that has been indicated in the PR title, OR this isn't a breaking change.
  • Necessary docs have been updated, OR no docs need to be updated.

Validation performed

  • All workflows passed.
  • Added new test cases for hard timeout behavior in test_compression_listener.rs.
  • Verified no remaining references to the removed per-package buffer fields across components/ and tools/.

Summary by CodeRabbit

  • Configuration Updates

    • Removed global buffer/channel tuning options and moved buffering configuration to per-job settings with sensible defaults; templates and deployment values updated.
  • Behavior Changes

    • Buffering and listener timing refined: flush triggers, timer enable/disable and submission signalling adjusted for more predictable, timely flushes; improved startup/error handling for listener creation.
  • Tests

    • Added and updated tests validating timing, threshold and flush scenarios, including hard-timeout behaviour.
  • Chores

    • Updated Helm chart version to 0.2.1-dev.5.

@LinZhihao-723 LinZhihao-723 requested a review from a team as a code owner March 31, 2026 03:23
@coderabbitai

coderabbitai Bot commented Mar 31, 2026

Copy link
Copy Markdown
Contributor

Walkthrough

Global log-ingestor buffer/channel settings were removed and relocated to a per-job BufferConfig. Buffer API now signals when a submission is triggered. Listener timer semantics changed from soft (resetting) to hard (not reset on adds). Tests and deployment templates updated accordingly.

Changes

Cohort / File(s) Summary
Global config removal
components/clp-py-utils/clp_py_utils/clp_config.py, components/clp-rust-utils/src/clp_config/package/config.rs
Removed global buffer_flush_timeout, buffer_flush_threshold, and channel_capacity from public LogIngestor configuration models.
Per-job buffer config
components/clp-rust-utils/src/job_config/ingestion.rs
Added pub struct BufferConfig with defaults (4 GiB, 300 s, 16) and added buffer_config: BufferConfig to BaseConfig with #[serde(default)].
Buffer API & behaviour
components/log-ingestor/src/compression/buffer.rs
Buffer::add(...) return changed from Result<()> to Result<bool> to indicate if submission occurred; added pub const fn is_empty(&self) -> bool.
Listener control flow
components/log-ingestor/src/compression/listener.rs
Switched to hard timeout semantics: single pinned Sleep + timer_active flag, conditional timer arm, adjusted add/submit/timer interactions; Listener::spawn now returns Result<Self> and errors on invalid channel capacity; added cancellation logging.
Integration wiring
components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs
Connector no longer reads global buffer fields; derives base_config from per-job config and passes base_config.buffer_config into submitter and Listener::spawn; context creation now returns Result and restart loading uses best-effort helper to skip failing jobs.
Tests
components/log-ingestor/tests/test_compression_listener.rs, components/log-ingestor/tests/test_ingestion_job.rs
Updated tests to use sized entry helpers, millisecond timeouts, added test_listener_hard_timeout, and updated test configs to set buffer_config: BufferConfig::default().
Deployment & templates
components/package-template/src/etc/clp-config.template.json.yaml, tools/deployment/package-helm/templates/configmap.yaml, tools/deployment/package-helm/values.yaml, tools/deployment/package-helm/Chart.yaml
Removed commented/global buffer/channel fields from templates and values; ConfigMap template no longer emits those fields; bumped Helm chart version to 0.2.1-dev.5.

Sequence Diagram(s)

mermaid
sequenceDiagram
participant Job as S3IngestionJob (config)
participant Listener as Listener Task
participant Buffer as Compression Buffer
participant Submitter as CompressionJobSubmitter
participant Store as Remote Store

Job->>Listener: spawn with BufferConfig (threshold, timeout, capacity)
Listener->>Buffer: add(entries)
alt add triggers size-threshold
    Buffer-->>Listener: returns true (submission triggered)
    Listener->>Submitter: submit buffered items
    Submitter->>Store: upload/compress
    Store-->>Submitter: ack
    Submitter-->>Listener: submission complete
    Listener->>Listener: if buffer empty -> disable timer
else add does not trigger submission
    Buffer-->>Listener: returns false
    Listener->>Listener: if buffer was empty -> enable timer
end
Note right of Listener: Timer arm fires independently (hard timeout)
Listener->>Buffer: on timeout -> submit if not empty
Buffer-->>Listener: submission result
Listener->>Submitter: submit buffered items

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

Suggested reviewers

  • junhaoliao
🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The PR title clearly and specifically describes the two main changes: switching to hard timeout and moving buffer config from package to job config, directly addressing the primary objectives.
Linked Issues check ✅ Passed All coding requirements from issue #2136 are fully met: hard timeout mechanism implemented with timer not reset on ingestion but only after submission, buffer config moved to per-job level, and size-based flush preserved.
Out of Scope Changes check ✅ Passed All changes are directly scoped to implementing the hard timeout mechanism and moving buffer configuration from package to job level; no unrelated modifications detected.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

LinZhihao-723 and others added 2 commits March 30, 2026 23:24
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
components/log-ingestor/src/compression/listener.rs (1)

53-95: ⚠️ Potential issue | 🟠 Major

Add biased; to the select! macro to enforce timer priority when entries are under timeout.

The select! macro without biased; polls branches in random order. When the timer is due and the receiver has data ready, the receiver may be polled first, allowing new entries to be consumed before the timeout branch executes. This violates the documented guarantee at lines 31–33 that "the oldest buffered entry has remained in the buffer for at least the configured timeout duration." Under sustained ingestion, this delays flushing past the deadline. Use biased; at line 54 to poll branches top-to-bottom, or reorder branches to place the timer first when strict timeout enforcement is required.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@components/log-ingestor/src/compression/listener.rs` around lines 53 - 95,
The select! invocation in the listener loop should be made biased so the timer
branch gets top-to-bottom priority and enforces the timeout guarantee: add the
token "biased;" as the first item inside the select! block (above the
cancel_token, receiver, and timer branches) so when the timer is ready it will
be polled before the receiver branch; keep the existing logic using timer,
timer_active, buffer, receiver, cancel_token, timeout and Instant::now()
unchanged so the timer reset/submit behavior remains intact.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@components/clp-rust-utils/src/job_config/ingestion.rs`:
- Around line 69-71: BufferConfig currently prevents partial per-job overrides
because its fields aren't using serde defaults; update the type so
deserialisation can omit individual fields: either add #[serde(default)] to the
BufferConfig struct declaration so the existing Default impl is used for missing
fields, or add #[serde(default)] to each BufferConfig field (e.g.,
flush_threshold_bytes, flush_interval_ms, max_buffer_bytes) so per-job overrides
like buffer_config.flush_threshold_bytes deserialize correctly; ensure
BufferConfig and the buffer_config field remain compatible with the existing
Default impl.

In `@components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs`:
- Around line 417-429: Recovered jobs are now using base_config.buffer_config
unconditionally which causes legacy rows (that never stored buffer_config) to
get BufferConfig::default() and change behavior; update the recovery path in
clp_ingestion.rs so when constructing the Listener (the Buffer::new call and the
Duration/ channel_capacity args used in Listener::spawn) you detect
missing/legacy buffer settings on the recovered job and fall back to the
package/deployment-level buffering values (the previous global/defaults) instead
of base_config.buffer_config defaults—use the job record’s metadata or a
compatibility helper to return the original package-level buffer values and pass
those into Buffer::new and Listener::spawn, preserving the previous buffering
behavior for legacy CompressionJobSubmitter-created jobs.

In `@components/log-ingestor/tests/test_compression_listener.rs`:
- Around line 77-89: The test is timing the sender rather than the listener
(timestamps like t_2, t_4, t_7 are taken after send(...).await), making
assertions flaky; update the tests (including test_compression_listener and
calls around assert_submission_count_at) to either use a deterministic Tokio
test clock (tokio::time::pause()/advance()) so the listener's hard-timeout is
driven deterministically, or capture the instants before awaiting send so the
timeout window matches when the listener arms it; alternatively increase the
timing margins (use a much larger slack than 2ms or replace
DEFAULT_TIMEOUT_SECONDS with a larger test-specific value) and apply the same
change to the other occurrences called out (lines around 118-119, 160-183,
193-218, 239-257) so assertions reflect the listener's deadline rather than the
sender scheduling.

---

Outside diff comments:
In `@components/log-ingestor/src/compression/listener.rs`:
- Around line 53-95: The select! invocation in the listener loop should be made
biased so the timer branch gets top-to-bottom priority and enforces the timeout
guarantee: add the token "biased;" as the first item inside the select! block
(above the cancel_token, receiver, and timer branches) so when the timer is
ready it will be polled before the receiver branch; keep the existing logic
using timer, timer_active, buffer, receiver, cancel_token, timeout and
Instant::now() unchanged so the timer reset/submit behavior remains intact.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: f7b1df51-4adf-431e-b968-66b05152304a

📥 Commits

Reviewing files that changed from the base of the PR and between 1d6b00f and 497b43d.

⛔ Files ignored due to path filters (1)
  • docs/src/_static/generated/log-ingestor-openapi.json is excluded by !**/generated/**
📒 Files selected for processing (12)
  • components/clp-py-utils/clp_py_utils/clp_config.py
  • components/clp-rust-utils/src/clp_config/package/config.rs
  • components/clp-rust-utils/src/job_config/ingestion.rs
  • components/log-ingestor/src/compression/buffer.rs
  • components/log-ingestor/src/compression/listener.rs
  • components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs
  • components/log-ingestor/tests/test_compression_listener.rs
  • components/log-ingestor/tests/test_ingestion_job.rs
  • components/package-template/src/etc/clp-config.template.json.yaml
  • tools/deployment/package-helm/Chart.yaml
  • tools/deployment/package-helm/templates/configmap.yaml
  • tools/deployment/package-helm/values.yaml
💤 Files with no reviewable changes (5)
  • components/package-template/src/etc/clp-config.template.json.yaml
  • tools/deployment/package-helm/templates/configmap.yaml
  • components/clp-rust-utils/src/clp_config/package/config.rs
  • tools/deployment/package-helm/values.yaml
  • components/clp-py-utils/clp_py_utils/clp_config.py

Comment thread components/clp-rust-utils/src/job_config/ingestion.rs
Comment thread components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs
Comment thread components/log-ingestor/tests/test_compression_listener.rs

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@components/log-ingestor/tests/test_compression_listener.rs`:
- Around line 239-256: The assertion context strings are misleading: update the
messages passed to assert_submission_count_at for the two timeout checks to
reference the correct timestamps and phrasing (use t_5 and t_7 with
TIMEOUT/SLACK) and remove incorrect references to "t_10" and numbered "step"
labels; specifically edit the calls to assert_submission_count_at (the two uses
after the comment blocks) so their descriptive strings say something like
"phase: timer should not have fired at t_5 + TIMEOUT" and "phase: timeout flush
of [124] at t_7 + TIMEOUT" or similar clear phrasing that mentions t_5/t_7,
TIMEOUT/SLACK, and avoids step numbers.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 845bbdb4-c61c-43cf-8ff8-2f44ef4e1ddb

📥 Commits

Reviewing files that changed from the base of the PR and between 497b43d and d0be0b7.

📒 Files selected for processing (1)
  • components/log-ingestor/tests/test_compression_listener.rs

Comment thread components/log-ingestor/tests/test_compression_listener.rs
hoophalab
hoophalab previously approved these changes Mar 31, 2026

@hoophalab hoophalab left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. One question.

Validations:

  1. Listen to an SQS queue and set the timeout to 5 seconds.
  2. Upload a small file. ~5 seconds later, a compression job is created.
  3. Continuously upload small files. The first compression job is created ~5 seconds after the first file is uploaded.
  4. Stop uploading files. The last file is eventually compressed in all my tests.

Comment on lines +169 to +186
pub struct BufferConfig {
/// Size-based flush threshold in bytes.
///
/// The buffer is flushed to create a compression job once the total size of buffered
/// objects exceeds this threshold.
///
/// Defaults to 4 GiB.
pub flush_threshold_bytes: u64,

/// Time-based flush threshold in seconds.
///
/// This is a hard timeout. The buffer is flushed to create a compression job when the
/// oldest buffered object has remained in the buffer for at least this duration,
/// regardless of the total buffered size. The timer is not reset by newly ingested
/// objects.
///
/// Defaults to 300 seconds (5 minutes).
pub timeout_sec: u64,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to confirm this is the intended behavior:

without #[serde(default = "...")] on each field, the user must either omit buffer_config entirely or specify all three fields. {"buffer_config": {"timeout_sec": 100}} would be invalid.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed by adding #[serde(default)] to BufferConfig. Added unit tests to make sure it works. Also tested myself with a partially overwritten buffer config when submitting ingestion jobs.

@hoophalab

Copy link
Copy Markdown
Contributor

per-package sounds like an adjective. How about this title:

feat(log-ingestor)!: Switch buffer submission from soft timeout to hard timeout; Move buffer config from package config to job config (resolves #2136).

@LinZhihao-723 LinZhihao-723 changed the title feat(log-ingestor)!: Switch buffer submission from soft timeout to hard timeout; Move buffer config from per-package to per-job (resolves #2136). feat(log-ingestor)!: Switch buffer submission from soft timeout to hard timeout; Move buffer config from package config to job config (resolves #2136). Mar 31, 2026
@LinZhihao-723 LinZhihao-723 requested a review from hoophalab March 31, 2026 16:49

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@components/clp-rust-utils/src/job_config/ingestion.rs`:
- Around line 167-197: Add OpenAPI schema constraints to BufferConfig by
annotating the fields (flush_threshold_bytes, timeout_sec, channel_capacity)
with #[schema(...)] attributes reflecting valid ranges (e.g., minimum values:
flush_threshold_bytes >= 1, timeout_sec >= 1, channel_capacity >= 1; optionally
set sensible maximums) to match other configs; if zero values must be rejected
at runtime, implement a ValidatedBufferConfig wrapper (analogous to
ValidatedSqsListenerConfig) that validates and converts from BufferConfig,
returning an error for channel_capacity == 0 or timeout_sec == 0 and documenting
defaults (e.g., 4 GiB and 300s) in the schema annotations.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 6dcf39bc-39f1-481c-85c8-9a1313cc46f0

📥 Commits

Reviewing files that changed from the base of the PR and between d0be0b7 and 7f19e10.

📒 Files selected for processing (1)
  • components/clp-rust-utils/src/job_config/ingestion.rs

Comment on lines +167 to +197
/// Configuration for buffer behavior.
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
#[serde(default)]
pub struct BufferConfig {
/// Size-based flush threshold in bytes.
///
/// The buffer is flushed to create a compression job once the total size of buffered
/// objects exceeds this threshold.
///
/// Defaults to 4 GiB.
pub flush_threshold_bytes: u64,

/// Time-based flush threshold in seconds.
///
/// This is a hard timeout. The buffer is flushed to create a compression job when the
/// oldest buffered object has remained in the buffer for at least this duration,
/// regardless of the total buffered size. The timer is not reset by newly ingested
/// objects.
///
/// Defaults to 300 seconds (5 minutes).
pub timeout_sec: u64,

/// Capacity of the internal buffer channel.
///
/// Defines the maximum number of objects that can be queued before being processed by the
/// buffer. Increasing this value may improve throughput at the cost of higher memory
/// usage.
///
/// Defaults to 16.
pub channel_capacity: usize,
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Consider adding #[schema(...)] annotations for OpenAPI consistency.

Other config fields in this file (e.g., num_concurrent_listener_tasks, wait_time_sec) include #[schema(minimum, maximum)] constraints for API documentation. Adding similar annotations to BufferConfig fields would improve documentation consistency and help API consumers understand valid ranges.

Additionally, consider whether validation is needed for edge cases:

  • channel_capacity: 0 creates a Tokio rendezvous channel where send blocks until a receiver is ready, which could cause unexpected behavior.
  • timeout_sec: 0 would trigger immediate timeout flushes.

If these edge cases should be disallowed, a ValidatedBufferConfig wrapper (similar to ValidatedSqsListenerConfig) could enforce constraints.

📝 Example schema annotations
 pub struct BufferConfig {
+    #[schema(minimum = 1)]
     pub flush_threshold_bytes: u64,

+    #[schema(minimum = 1)]
     pub timeout_sec: u64,

+    #[schema(minimum = 1)]
     pub channel_capacity: usize,
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/// Configuration for buffer behavior.
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
#[serde(default)]
pub struct BufferConfig {
/// Size-based flush threshold in bytes.
///
/// The buffer is flushed to create a compression job once the total size of buffered
/// objects exceeds this threshold.
///
/// Defaults to 4 GiB.
pub flush_threshold_bytes: u64,
/// Time-based flush threshold in seconds.
///
/// This is a hard timeout. The buffer is flushed to create a compression job when the
/// oldest buffered object has remained in the buffer for at least this duration,
/// regardless of the total buffered size. The timer is not reset by newly ingested
/// objects.
///
/// Defaults to 300 seconds (5 minutes).
pub timeout_sec: u64,
/// Capacity of the internal buffer channel.
///
/// Defines the maximum number of objects that can be queued before being processed by the
/// buffer. Increasing this value may improve throughput at the cost of higher memory
/// usage.
///
/// Defaults to 16.
pub channel_capacity: usize,
}
/// Configuration for buffer behavior.
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
#[serde(default)]
pub struct BufferConfig {
/// Size-based flush threshold in bytes.
///
/// The buffer is flushed to create a compression job once the total size of buffered
/// objects exceeds this threshold.
///
/// Defaults to 4 GiB.
#[schema(minimum = 1)]
pub flush_threshold_bytes: u64,
/// Time-based flush threshold in seconds.
///
/// This is a hard timeout. The buffer is flushed to create a compression job when the
/// oldest buffered object has remained in the buffer for at least this duration,
/// regardless of the total buffered size. The timer is not reset by newly ingested
/// objects.
///
/// Defaults to 300 seconds (5 minutes).
#[schema(minimum = 1)]
pub timeout_sec: u64,
/// Capacity of the internal buffer channel.
///
/// Defines the maximum number of objects that can be queued before being processed by the
/// buffer. Increasing this value may improve throughput at the cost of higher memory
/// usage.
///
/// Defaults to 16.
#[schema(minimum = 1)]
pub channel_capacity: usize,
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@components/clp-rust-utils/src/job_config/ingestion.rs` around lines 167 -
197, Add OpenAPI schema constraints to BufferConfig by annotating the fields
(flush_threshold_bytes, timeout_sec, channel_capacity) with #[schema(...)]
attributes reflecting valid ranges (e.g., minimum values: flush_threshold_bytes
>= 1, timeout_sec >= 1, channel_capacity >= 1; optionally set sensible maximums)
to match other configs; if zero values must be rejected at runtime, implement a
ValidatedBufferConfig wrapper (analogous to ValidatedSqsListenerConfig) that
validates and converts from BufferConfig, returning an error for
channel_capacity == 0 or timeout_sec == 0 and documenting defaults (e.g., 4 GiB
and 300s) in the schema annotations.

@coderabbitai

coderabbitai Bot commented Mar 31, 2026

Copy link
Copy Markdown
Contributor

Caution

Review failed

An error occurred during the review process. Please try again later.

Walkthrough

Removed global buffer/channel tuning from LogIngestor and moved buffer settings to per-job BufferConfig. Listener timeout semantics changed from soft (resetting) to hard timeout with conditional timer activation. Buffer API adjusted to signal submissions. Tests and deployment templates updated accordingly.

Changes

Cohort / File(s) Summary
Global config removal
components/clp-py-utils/clp_py_utils/clp_config.py, components/clp-rust-utils/src/clp_config/package/config.rs
Removed buffer_flush_timeout, buffer_flush_threshold, and channel_capacity from the public LogIngestor models.
Per-job buffer config
components/clp-rust-utils/src/job_config/ingestion.rs
Added pub struct BufferConfig { flush_threshold_bytes, timeout_sec, channel_capacity } with Default (4 GiB, 300 s, 16) and added buffer_config: BufferConfig to BaseConfig with #[serde(default)].
Buffer API & behaviour
components/log-ingestor/src/compression/buffer.rs
Changed Buffer::add(...) return from Result<()> to Result<bool> to indicate whether a submission was triggered; added pub const fn is_empty(&self) -> bool.
Listener control flow
components/log-ingestor/src/compression/listener.rs
Refactored timer handling to use a single pinned Sleep and timer_active flag; the timer arm is conditional and enabled/disabled only on buffer transitions; adjusted cancellation logging and submission points (hard timeout semantics).
Integration wiring
components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs
Removed connector-level buffer fields; derive base_config from per-job config and pass base_config.buffer_config.* into Listener::spawn and submitter creation.
Tests
components/log-ingestor/tests/test_compression_listener.rs, components/log-ingestor/tests/test_ingestion_job.rs
Updated tests to use millisecond timeouts, added helpers for sized test entries, added test_listener_hard_timeout, and updated tests to set buffer_config: BufferConfig::default().
Deployment & templates
components/package-template/src/etc/clp-config.template.json.yaml, tools/deployment/package-helm/templates/configmap.yaml, tools/deployment/package-helm/values.yaml, tools/deployment/package-helm/Chart.yaml
Removed commented/global buffer/channel fields from templates and values; ConfigMap template no longer emits removed fields; bumped Helm chart version to 0.2.1-dev.5.

Sequence Diagram(s)

mermaid
sequenceDiagram
participant Job as S3IngestionJob (config)
participant Listener as Listener Task
participant Buffer as Compression Buffer
participant Submitter as CompressionJobSubmitter
participant Store as Remote Store

Job->>Listener: spawn with BufferConfig (threshold, timeout, capacity)
Listener->>Buffer: add(entries)
alt add triggers size-threshold
    Buffer-->>Listener: returns true (submission triggered)
    Listener->>Submitter: submit buffered items
    Submitter->>Store: upload/compress
    Store-->>Submitter: ack
    Submitter-->>Listener: submission complete
    Listener->>Listener: if buffer empty -> disable timer
else add does not trigger submission
    Buffer-->>Listener: returns false
    Listener->>Listener: if buffer was empty -> enable timer
end
Note right of Listener: Timer arm fires independently (hard timeout)
Listener->>Buffer: on timeout -> submit if not empty
Buffer-->>Listener: submission result
Listener->>Submitter: submit buffered items

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

Suggested reviewers

  • junhaoliao
🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly describes the two main changes: switching to hard timeout and moving buffer config to job config, which align with the changeset modifications.
Linked Issues check ✅ Passed The PR successfully implements the hard timeout mechanism (timer not reset on new ingestions) and per-job buffer config migration, addressing all requirements in issue #2136.
Out of Scope Changes check ✅ Passed All changes are directly related to the two stated objectives: hard timeout implementation and buffer config migration from package to job level.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@LinZhihao-723 LinZhihao-723 left a comment

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found an issue: if channel_capacity is 0, the channel creation will panic and thus kill the coroutine. In the current flow, we need to create the job entry in the ingestion job table before we create the channel. That job will be silently failed. And even worse, if log-ingestor restarts, it will fail forever because the panic will kill the restart coroutine.
To address this, we need to add a check to make sure the channel capacity is a positive integer. In the latest push, this is a solved problem.
The only issue left is: if channel size 0 is given, the user response would be "internal error" becuase we use anyhow result that can't differentiate it from real internal errors from other config errors. Two solutions:

  1. In clp_ingestion, we return a thiserror instead of using anyhow::Result.
  2. We pre-validate the buffer config as we did for the SQS listener config.

However, either way seems out of the scope for this PR. In the OpenAPI doc, we added a notice that a non-positive capacity will result in undefined behavior. That is probably enough for the current PR and release cycle, and the improvements should be tracked in an issue and planned for a future release.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@components/log-ingestor/tests/test_compression_listener.rs`:
- Around line 196-199: The test currently relies on a fragile timing heuristic
using tokio::time::sleep(SLACK).await before asserting shared.lock().await.len()
== 2; replace this with a deterministic approach by calling tokio::time::pause()
at the start of the test and then using tokio::time::advance(SLACK).await before
the assertion (or, if you prefer a simpler change, increase the SLACK constant
value to give more margin); update the equivalent occurrences around the second
assertion block (the region noted at lines 225–233) so both places use the same
deterministic time-control or larger SLACK to avoid CI flakiness.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: b48ae7e6-25b2-4c39-aa21-08601305d990

📥 Commits

Reviewing files that changed from the base of the PR and between 7f19e10 and bb5a6a4.

⛔ Files ignored due to path filters (1)
  • docs/src/_static/generated/log-ingestor-openapi.json is excluded by !**/generated/**
📒 Files selected for processing (4)
  • components/clp-rust-utils/src/job_config/ingestion.rs
  • components/log-ingestor/src/compression/listener.rs
  • components/log-ingestor/src/ingestion_job_manager/clp_ingestion.rs
  • components/log-ingestor/tests/test_compression_listener.rs

Comment thread components/log-ingestor/tests/test_compression_listener.rs
@LinZhihao-723 LinZhihao-723 merged commit 862fa26 into y-scope:main Apr 1, 2026
25 checks passed
@LinZhihao-723 LinZhihao-723 deleted the resolve-2136 branch April 1, 2026 01:22
@junhaoliao junhaoliao added this to the March 2026 milestone Apr 24, 2026
junhaoliao pushed a commit to junhaoliao/clp that referenced this pull request May 17, 2026
…rd timeout; Move buffer config from package config to job config (resolves y-scope#2136). (y-scope#2150)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[log-ingestor] Switch buffer submission from soft timeout to hard timeout

3 participants