feat(log-ingestor): Add Listener to asynchronously receive and buffer S3 object metadata for compression job submission.#1552
Conversation
|
Important Review skippedReview was skipped due to path filters ⛔ Files ignored due to path filters (1)
CodeRabbit blocks several paths by default. You can override this behavior by explicitly including those paths in the path filters. For example, including You can disable this status message by setting the Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughAdds a new public Changes
Sequence Diagram(s)sequenceDiagram
participant Producer as Producers
participant Sender as mpsc::Sender
participant Listener as ListenerTask
participant Buffer as Buffer
participant Submitter as BufferSubmitter
Producer->>Sender: send(ObjectMetadata)
Sender->>Listener: deliver(ObjectMetadata)
Listener->>Buffer: add(ObjectMetadata)
alt size threshold reached
Buffer->>Submitter: submit(batch)
Submitter-->>Buffer: Result
Buffer->>Buffer: clear()
else timeout or cancellation
Listener->>Buffer: submit()
Buffer->>Submitter: submit(batch)
Submitter-->>Buffer: Result
Buffer->>Buffer: clear()
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20–30 minutes
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Pull Request Overview
This PR introduces a compression listener system for buffering and submitting S3 object metadata. The implementation includes a buffer that accumulates metadata until a size threshold is reached, and a listener task that manages submission based on buffer capacity, timeout, or cancellation events.
- Adds
BufferandListenertypes to manage S3 object metadata buffering and submission - Implements timeout-based and capacity-based submission triggers
- Includes comprehensive integration test for the listener functionality
Reviewed Changes
Copilot reviewed 7 out of 8 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| components/log-ingestor/src/lib.rs | Exposes the compression module |
| components/log-ingestor/src/compression.rs | Module definition with public exports and default capacity constant |
| components/log-ingestor/src/compression/buffer.rs | Buffer implementation with BufferSubmitter trait and size-based submission logic |
| components/log-ingestor/src/compression/listener.rs | Listener task with timeout, capacity, and cancellation-based submission triggers |
| components/log-ingestor/tests/test_compression_listener.rs | Integration test validating concurrent submission and timeout behavior |
| components/clp-rust-utils/src/s3.rs | Adds ObjectMetadata struct with bucket, key, and size fields |
| components/log-ingestor/Cargo.toml | Adds dependencies for async operations and error handling |
| Cargo.lock | Updates lock file with new dependencies |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (7)
components/clp-rust-utils/src/s3.rs(1 hunks)components/log-ingestor/Cargo.toml(1 hunks)components/log-ingestor/src/compression.rs(1 hunks)components/log-ingestor/src/compression/buffer.rs(1 hunks)components/log-ingestor/src/compression/listener.rs(1 hunks)components/log-ingestor/src/lib.rs(1 hunks)components/log-ingestor/tests/test_compression_listener.rs(1 hunks)
🧰 Additional context used
🧠 Learnings (4)
📓 Common learnings
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-08T15:52:50.753Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-01T07:59:11.208Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
📚 Learning: 2025-10-22T21:02:31.113Z
Learnt from: junhaoliao
Repo: y-scope/clp PR: 0
File: :0-0
Timestamp: 2025-10-22T21:02:31.113Z
Learning: Repository y-scope/clp: Maintain deterministic CI/builds for Rust; add a check to verify Cargo.lock is in sync with Cargo.toml without updating dependencies (non-mutating verification in clp-rust-checks workflow).
Applied to files:
components/log-ingestor/Cargo.toml
📚 Learning: 2025-10-20T22:20:45.733Z
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 1448
File: components/clp-rust-utils/Cargo.toml:4-4
Timestamp: 2025-10-20T22:20:45.733Z
Learning: Rust edition 2024 is a valid edition value in Cargo.toml files. It was released with Rust 1.85 on February 20, 2025.
Applied to files:
components/log-ingestor/Cargo.toml
📚 Learning: 2025-10-31T15:41:23.952Z
Learnt from: hoophalab
Repo: y-scope/clp PR: 1534
File: components/api-server/src/client.rs:129-133
Timestamp: 2025-10-31T15:41:23.952Z
Learning: In Rust 2024 edition, the syntax `+ use<>` in return-position `impl Trait` types (e.g., `-> impl Stream<Item = T> + use<>`) is valid and indicates that the opaque type captures no generic parameters from the surrounding scope. This is part of the new precise capture rules introduced in Rust 2024 edition.
Applied to files:
components/log-ingestor/Cargo.toml
🧬 Code graph analysis (3)
components/log-ingestor/tests/test_compression_listener.rs (2)
components/log-ingestor/src/compression/buffer.rs (3)
new(39-46)submit(17-17)submit(81-88)components/log-ingestor/src/compression/listener.rs (1)
spawn(91-111)
components/log-ingestor/src/compression/listener.rs (2)
components/log-ingestor/src/compression/buffer.rs (4)
submit(17-17)submit(81-88)add(59-68)new(39-46)components/log-ingestor/tests/test_compression_listener.rs (2)
submit(30-34)new(17-21)
components/log-ingestor/src/compression/buffer.rs (1)
components/log-ingestor/tests/test_compression_listener.rs (2)
submit(30-34)new(17-21)
🔇 Additional comments (16)
components/log-ingestor/src/lib.rs (1)
1-1: LGTM!Clean module declaration exposing the new compression subsystem to the crate's public API.
components/clp-rust-utils/src/s3.rs (1)
5-10: LGTM!Well-designed metadata struct with appropriate derives. The derived
Ordimplementation provides deterministic ordering (by bucket, then key, then size), which is useful for testing and debugging.components/log-ingestor/tests/test_compression_listener.rs (2)
38-58: LGTM!Well-designed test helper functions with clear documentation. The use of
unwrap()insend_to_listeneris acceptable for test code.
60-128: Good integration test coverage with a minor note on timeout testing.The test effectively validates:
- Concurrent producers sending to the same listener
- Size-based buffer submission (2 submissions)
- Timeout-based submission (1 submission)
- Proper cleanup via shutdown
One minor consideration: the timeout-based test at Line 101 (
sleep(timeout + 1)) could be flaky on heavily loaded CI systems. This is a common trade-off in integration testing, and the current approach is acceptable.components/log-ingestor/src/compression/listener.rs (4)
41-65: LGTM!The event loop correctly implements the three submission triggers:
- Cancellation → submit and shutdown
- New metadata → buffer (with automatic size-based submission)
- Timeout → submit on inactivity
The timer reset at Line 63 happens after every iteration, implementing an inactivity timeout (time since last event) rather than an absolute timeout, which aligns with the documented behaviour of "timeout without receiving new object metadata."
Based on learnings
91-111: LGTM!Clean spawning logic with proper ownership transfer. Using a child cancellation token is good practice, allowing graceful shutdown without affecting other potential child tasks.
124-127: LGTM!Straightforward shutdown implementation. The cancellation token triggers the task's cancel branch, which submits any remaining buffered data before returning. Errors from the task (including panics) are properly propagated.
129-139: LGTM!Good use of
#[must_use]attribute and clear documentation explaining the cloning semantics and ordering guarantees for multiple senders. The implementation is straightforward and correct.components/log-ingestor/src/compression.rs (1)
1-7: LGTM!Clean module structure using the facade pattern. The wildcard re-exports (
pub use) are idiomatic in Rust for exposing submodule contents through a parent module.DEFAULT_LISTENER_CAPACITY = 8is a reasonable default for the channel buffer.components/log-ingestor/src/compression/buffer.rs (6)
5-18: LGTM!Well-designed trait with clear documentation. Taking
&[ObjectMetadata]allows implementations to process the buffer without taking ownership, providing flexibility.
20-31: LGTM!Well-encapsulated struct with appropriate private fields. The separation of
total_sizeandsize_thresholdmakes the threshold logic explicit.
39-46: LGTM!Clean constructor using
const fn. The initialization is straightforward and correct.
59-68: LGTM!Correct implementation of the size-based buffering logic. The threshold check includes the newly added item, ensuring the buffer is submitted when the cumulative size reaches or exceeds the threshold.
81-88: LGTM!Correct submission logic with appropriate error handling. The buffer is only cleared after successful submission, allowing for potential retry logic if submission fails.
90-93: LGTM!Straightforward clear implementation that properly resets both the buffer and size tracking. Appropriate private visibility since this is only called internally.
components/log-ingestor/Cargo.toml (1)
7-11: Update Cargo.lock to synchronize with Cargo.toml changes.The new dependencies have been added to
Cargo.toml, butCargo.lockis out of sync. Runcargo checkorcargo updateincomponents/log-ingestor/to update the lock file before merging. This ensures deterministic builds as required by the CI pipeline.⛔ Skipped due to learnings
Learnt from: junhaoliao Repo: y-scope/clp PR: 0 File: :0-0 Timestamp: 2025-10-22T21:01:31.391Z Learning: In y-scope/clp, for the clp-rust-checks workflow (GitHub Actions) and the Taskfile target deps:lock:check-rust, we should verify Rust Cargo.lock is in sync with Cargo.toml using a non-mutating method (e.g., cargo metadata --locked / cargo check --locked) to keep CI deterministic and avoid updating dependencies during validation.Learnt from: junhaoliao Repo: y-scope/clp PR: 0 File: :0-0 Timestamp: 2025-10-22T21:02:31.113Z Learning: Repository y-scope/clp: Maintain deterministic CI/builds for Rust; add a check to verify Cargo.lock is in sync with Cargo.toml without updating dependencies (non-mutating verification in clp-rust-checks workflow).
There was a problem hiding this comment.
Looks good to me. There are some minor comments in terms of code.
I'm still feeling a bit uneasy about how we'll handle data persistence. Right now, we essentially have two buffers: the channel buffer and our buffer. Anything submitted to the channel buffer won't be written to disk until select! { receiver.recv()} runs.
I'm wondering if we should wrap Buffer in an arc-mutex and have the sender call Buffer::submit directly. However, this lock-based design will hurt performance. Alternatively, we could look into implementing a data-persistent channel. Or even further, we maybe should just use rabbitmq as a data-persistent channel?
Anyway, the current code is all good to be merged.
In our planning, we can persist the state using the following scheme:
That said, we don't need to explicitly persist any internal states for either the channel or the buffer. We rely on the DB update policy to ensure data persistence. |
Co-authored-by: hoophalab <200652805+hoophalab@users.noreply.github.com>
Co-authored-by: hoophalab <200652805+hoophalab@users.noreply.github.com>
There was a problem hiding this comment.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (1)
components/log-ingestor/src/compression/listener.rs(1 hunks)
🧰 Additional context used
🧠 Learnings (1)
📓 Common learnings
Learnt from: Bill-hbrhbr
Repo: y-scope/clp PR: 1122
File: components/core/src/clp/clp/CMakeLists.txt:175-195
Timestamp: 2025-07-23T09:54:45.185Z
Learning: In the CLP project, when reviewing CMakeLists.txt changes that introduce new compression library dependencies (BZip2, LibLZMA, LZ4, ZLIB), the team prefers to address conditional linking improvements in separate PRs rather than expanding the scope of focused migration PRs like the LibArchive task-based installation migration.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-01T07:59:11.208Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-08T15:52:50.753Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
🧬 Code graph analysis (1)
components/log-ingestor/src/compression/listener.rs (2)
components/log-ingestor/src/compression/buffer.rs (4)
submit(17-17)submit(81-88)add(59-68)new(39-46)components/log-ingestor/tests/test_compression_listener.rs (2)
submit(30-34)new(17-21)
🔇 Additional comments (6)
components/log-ingestor/src/compression/listener.rs (6)
1-12: LGTM: Imports are appropriate and complete.All imports are necessary for the buffered listener implementation, including Tokio async primitives, cancellation support, and local compression types.
14-22: LGTM: ListenerTask structure is well-designed.The private task struct correctly owns the receiver to prevent concurrent access, and the generic Submitter parameter provides flexibility for different submission strategies.
24-77: LGTM: Core listener logic is correctly implemented.The
select!loop properly handles all three submission triggers (cancellation, buffer capacity viaBuffer::add, and timeout). The timer reset on line 74 after each iteration correctly implements "idle timeout" semantics: the timeout counter resets after any activity (receiving metadata or timer firing), and submission occurs if no activity happens for the configured duration. The receiver closure handling returns an error as intended, enforcing the contract thatshutdown_and_joinmust be called before senders are dropped.
79-85: LGTM: Listener structure is appropriate.The public struct correctly holds the sender for cloning to producers, a cancellation token for graceful shutdown, and a join handle for awaiting task completion.
124-138: LGTM: Shutdown logic is correct.The method properly triggers cancellation and waits for the task to complete, ensuring the buffer is submitted before termination. Error propagation from the task is correctly handled.
Note: Messages sent after
cancel_token.cancel()is called may not be processed if the cancellation branch wins theselect!race. This is standard async cancellation behaviour and acceptable given the documented shutdown semantics.
140-150: LGTM: Sender distribution is well-documented and correctly implemented.The method safely returns a cloned sender with clear documentation about ordering and concurrency semantics. The
#[must_use]attribute appropriately prevents accidental misuse.
…er S3 object metadata for compression job submission. (y-scope#1552) Co-authored-by: hoophalab <200652805+hoophalab@users.noreply.github.com>
Description
As the title suggests. This PR introduces
Listenerto receive S3 object metadata sent from the ingestor front end. The received metadata will be buffered by theBufferobject and eventually submitted by invoking a submitter that implementsBufferSubmitter(not implemented in this PR). The buffer will be submitted if:Checklist
breaking change.
Validation performed
Summary by CodeRabbit
New Features
Tests