Skip to content

feat(log-ingestor): Add Listener to asynchronously receive and buffer S3 object metadata for compression job submission.#1552

Merged
LinZhihao-723 merged 6 commits into
y-scope:mainfrom
LinZhihao-723:buffer
Nov 7, 2025
Merged

Conversation

@LinZhihao-723

@LinZhihao-723 LinZhihao-723 commented Nov 5, 2025

Copy link
Copy Markdown
Member

Description

As the title suggests. This PR introduces Listener to receive S3 object metadata sent from the ingestor front end. The received metadata will be buffered by the Buffer object and eventually submitted by invoking a submitter that implements BufferSubmitter (not implemented in this PR). The buffer will be submitted if:

  • A configurable size threshold is reached when a new object is received by the listener.
  • A configurable timeout is reached (no objects received within a certain amount of time).
  • A cancel signal is received.

Checklist

  • The PR satisfies the contribution guidelines.
  • This is a breaking change, and that has been indicated in the PR title, OR this isn't a
    breaking change.
  • Necessary docs have been updated, OR no docs need to be updated.

Validation performed

  • Ensure all workflows pass.
  • Add test cases to ensure the size-based submission and timeout-based submission work as expected.
  • The entire architecture (despite the implementation details being different) has been tested in the log-ingestor prototype.

Summary by CodeRabbit

  • New Features

    • Added a compression module to buffer and submit S3 object metadata via a reusable buffer and background listener, with a default listener capacity and a lightweight object-metadata type.
  • Tests

    • Added integration tests covering concurrent senders, size-triggered submits, timeout-driven submission, and end-to-end submission counts.

@LinZhihao-723 LinZhihao-723 requested a review from a team as a code owner November 5, 2025 00:10
@LinZhihao-723 LinZhihao-723 requested a review from Copilot November 5, 2025 00:10
@coderabbitai

coderabbitai Bot commented Nov 5, 2025

Copy link
Copy Markdown
Contributor

Important

Review skipped

Review was skipped due to path filters

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock

CodeRabbit blocks several paths by default. You can override this behavior by explicitly including those paths in the path filters. For example, including **/dist/** will override the default block on the dist directory, by removing the pattern from both the lists.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Note

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.

Walkthrough

Adds a new public ObjectMetadata type and a compression subsystem: a generic buffered Buffer with BufferSubmitter, a background Listener that batches and submits S3 metadata by size or timeout, crate dependency additions, and an async integration test for the listener.

Changes

Cohort / File(s) Summary
Core data structure
components/clp-rust-utils/src/s3.rs
Added pub struct ObjectMetadata { pub bucket: String, pub key: String, pub size: usize } with derives Debug, Clone, PartialEq, Eq, PartialOrd, Ord.
Dependencies
components/log-ingestor/Cargo.toml
Added crates: anyhow, async-trait, tokio (features: rt-multi-thread, macros, time), and tokio-util.
Module root & re-exports
components/log-ingestor/src/lib.rs, components/log-ingestor/src/compression.rs
Added pub mod compression;, declared mod buffer; and mod listener;, re-exported buffer::* and listener::*, and added pub const DEFAULT_LISTENER_CAPACITY: usize = 8.
Buffer implementation
components/log-ingestor/src/compression/buffer.rs
Added pub trait BufferSubmitter with async submit(&self, buffer: &[ObjectMetadata]) -> Result<()> and pub struct Buffer<Submitter> with new, add, submit, and internal buffering/threshold logic using anyhow and async-trait.
Listener implementation
components/log-ingestor/src/compression/listener.rs
Added Listener that spawns a background task buffering ObjectMetadata from an mpsc::Receiver, submits on capacity, timeout, or cancellation; exposes spawn, shutdown_and_join, and get_new_sender.
Tests
components/log-ingestor/tests/test_compression_listener.rs
Added async integration test with TestBufferSubmitter, helpers to generate/send objects, multiple concurrent senders, timeout-driven submission, and assertions validating submissions and counts.

Sequence Diagram(s)

sequenceDiagram
    participant Producer as Producers
    participant Sender as mpsc::Sender
    participant Listener as ListenerTask
    participant Buffer as Buffer
    participant Submitter as BufferSubmitter

    Producer->>Sender: send(ObjectMetadata)
    Sender->>Listener: deliver(ObjectMetadata)
    Listener->>Buffer: add(ObjectMetadata)
    alt size threshold reached
        Buffer->>Submitter: submit(batch)
        Submitter-->>Buffer: Result
        Buffer->>Buffer: clear()
    else timeout or cancellation
        Listener->>Buffer: submit()
        Buffer->>Submitter: submit(batch)
        Submitter-->>Buffer: Result
        Buffer->>Buffer: clear()
    end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20–30 minutes

  • Check async_trait usage and trait object/lifetime bounds for BufferSubmitter.
  • Validate tokio::select! logic, timer reset, and cancellation handling in listener.rs.
  • Verify buffer size accounting, threshold-triggered submit, and error propagation in buffer.rs.
  • Review integration test for timing/race flakiness and ensure assertions match expected submission semantics.

Pre-merge checks and finishing touches

✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and specifically describes the main change: adding a Listener component for asynchronous receipt and buffering of S3 object metadata for compression job submission.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces a compression listener system for buffering and submitting S3 object metadata. The implementation includes a buffer that accumulates metadata until a size threshold is reached, and a listener task that manages submission based on buffer capacity, timeout, or cancellation events.

  • Adds Buffer and Listener types to manage S3 object metadata buffering and submission
  • Implements timeout-based and capacity-based submission triggers
  • Includes comprehensive integration test for the listener functionality

Reviewed Changes

Copilot reviewed 7 out of 8 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
components/log-ingestor/src/lib.rs Exposes the compression module
components/log-ingestor/src/compression.rs Module definition with public exports and default capacity constant
components/log-ingestor/src/compression/buffer.rs Buffer implementation with BufferSubmitter trait and size-based submission logic
components/log-ingestor/src/compression/listener.rs Listener task with timeout, capacity, and cancellation-based submission triggers
components/log-ingestor/tests/test_compression_listener.rs Integration test validating concurrent submission and timeout behavior
components/clp-rust-utils/src/s3.rs Adds ObjectMetadata struct with bucket, key, and size fields
components/log-ingestor/Cargo.toml Adds dependencies for async operations and error handling
Cargo.lock Updates lock file with new dependencies

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread components/log-ingestor/src/compression/listener.rs

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f798f1f and 6e33496.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (7)
  • components/clp-rust-utils/src/s3.rs (1 hunks)
  • components/log-ingestor/Cargo.toml (1 hunks)
  • components/log-ingestor/src/compression.rs (1 hunks)
  • components/log-ingestor/src/compression/buffer.rs (1 hunks)
  • components/log-ingestor/src/compression/listener.rs (1 hunks)
  • components/log-ingestor/src/lib.rs (1 hunks)
  • components/log-ingestor/tests/test_compression_listener.rs (1 hunks)
🧰 Additional context used
🧠 Learnings (4)
📓 Common learnings
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-08T15:52:50.753Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-01T07:59:11.208Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
📚 Learning: 2025-10-22T21:02:31.113Z
Learnt from: junhaoliao
Repo: y-scope/clp PR: 0
File: :0-0
Timestamp: 2025-10-22T21:02:31.113Z
Learning: Repository y-scope/clp: Maintain deterministic CI/builds for Rust; add a check to verify Cargo.lock is in sync with Cargo.toml without updating dependencies (non-mutating verification in clp-rust-checks workflow).

Applied to files:

  • components/log-ingestor/Cargo.toml
📚 Learning: 2025-10-20T22:20:45.733Z
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 1448
File: components/clp-rust-utils/Cargo.toml:4-4
Timestamp: 2025-10-20T22:20:45.733Z
Learning: Rust edition 2024 is a valid edition value in Cargo.toml files. It was released with Rust 1.85 on February 20, 2025.

Applied to files:

  • components/log-ingestor/Cargo.toml
📚 Learning: 2025-10-31T15:41:23.952Z
Learnt from: hoophalab
Repo: y-scope/clp PR: 1534
File: components/api-server/src/client.rs:129-133
Timestamp: 2025-10-31T15:41:23.952Z
Learning: In Rust 2024 edition, the syntax `+ use<>` in return-position `impl Trait` types (e.g., `-> impl Stream<Item = T> + use<>`) is valid and indicates that the opaque type captures no generic parameters from the surrounding scope. This is part of the new precise capture rules introduced in Rust 2024 edition.

Applied to files:

  • components/log-ingestor/Cargo.toml
🧬 Code graph analysis (3)
components/log-ingestor/tests/test_compression_listener.rs (2)
components/log-ingestor/src/compression/buffer.rs (3)
  • new (39-46)
  • submit (17-17)
  • submit (81-88)
components/log-ingestor/src/compression/listener.rs (1)
  • spawn (91-111)
components/log-ingestor/src/compression/listener.rs (2)
components/log-ingestor/src/compression/buffer.rs (4)
  • submit (17-17)
  • submit (81-88)
  • add (59-68)
  • new (39-46)
components/log-ingestor/tests/test_compression_listener.rs (2)
  • submit (30-34)
  • new (17-21)
components/log-ingestor/src/compression/buffer.rs (1)
components/log-ingestor/tests/test_compression_listener.rs (2)
  • submit (30-34)
  • new (17-21)
🔇 Additional comments (16)
components/log-ingestor/src/lib.rs (1)

1-1: LGTM!

Clean module declaration exposing the new compression subsystem to the crate's public API.

components/clp-rust-utils/src/s3.rs (1)

5-10: LGTM!

Well-designed metadata struct with appropriate derives. The derived Ord implementation provides deterministic ordering (by bucket, then key, then size), which is useful for testing and debugging.

components/log-ingestor/tests/test_compression_listener.rs (2)

38-58: LGTM!

Well-designed test helper functions with clear documentation. The use of unwrap() in send_to_listener is acceptable for test code.


60-128: Good integration test coverage with a minor note on timeout testing.

The test effectively validates:

  • Concurrent producers sending to the same listener
  • Size-based buffer submission (2 submissions)
  • Timeout-based submission (1 submission)
  • Proper cleanup via shutdown

One minor consideration: the timeout-based test at Line 101 (sleep(timeout + 1)) could be flaky on heavily loaded CI systems. This is a common trade-off in integration testing, and the current approach is acceptable.

components/log-ingestor/src/compression/listener.rs (4)

41-65: LGTM!

The event loop correctly implements the three submission triggers:

  1. Cancellation → submit and shutdown
  2. New metadata → buffer (with automatic size-based submission)
  3. Timeout → submit on inactivity

The timer reset at Line 63 happens after every iteration, implementing an inactivity timeout (time since last event) rather than an absolute timeout, which aligns with the documented behaviour of "timeout without receiving new object metadata."

Based on learnings


91-111: LGTM!

Clean spawning logic with proper ownership transfer. Using a child cancellation token is good practice, allowing graceful shutdown without affecting other potential child tasks.


124-127: LGTM!

Straightforward shutdown implementation. The cancellation token triggers the task's cancel branch, which submits any remaining buffered data before returning. Errors from the task (including panics) are properly propagated.


129-139: LGTM!

Good use of #[must_use] attribute and clear documentation explaining the cloning semantics and ordering guarantees for multiple senders. The implementation is straightforward and correct.

components/log-ingestor/src/compression.rs (1)

1-7: LGTM!

Clean module structure using the facade pattern. The wildcard re-exports (pub use) are idiomatic in Rust for exposing submodule contents through a parent module. DEFAULT_LISTENER_CAPACITY = 8 is a reasonable default for the channel buffer.

components/log-ingestor/src/compression/buffer.rs (6)

5-18: LGTM!

Well-designed trait with clear documentation. Taking &[ObjectMetadata] allows implementations to process the buffer without taking ownership, providing flexibility.


20-31: LGTM!

Well-encapsulated struct with appropriate private fields. The separation of total_size and size_threshold makes the threshold logic explicit.


39-46: LGTM!

Clean constructor using const fn. The initialization is straightforward and correct.


59-68: LGTM!

Correct implementation of the size-based buffering logic. The threshold check includes the newly added item, ensuring the buffer is submitted when the cumulative size reaches or exceeds the threshold.


81-88: LGTM!

Correct submission logic with appropriate error handling. The buffer is only cleared after successful submission, allowing for potential retry logic if submission fails.


90-93: LGTM!

Straightforward clear implementation that properly resets both the buffer and size tracking. Appropriate private visibility since this is only called internally.

components/log-ingestor/Cargo.toml (1)

7-11: Update Cargo.lock to synchronize with Cargo.toml changes.

The new dependencies have been added to Cargo.toml, but Cargo.lock is out of sync. Run cargo check or cargo update in components/log-ingestor/ to update the lock file before merging. This ensures deterministic builds as required by the CI pipeline.

⛔ Skipped due to learnings
Learnt from: junhaoliao
Repo: y-scope/clp PR: 0
File: :0-0
Timestamp: 2025-10-22T21:01:31.391Z
Learning: In y-scope/clp, for the clp-rust-checks workflow (GitHub Actions) and the Taskfile target deps:lock:check-rust, we should verify Rust Cargo.lock is in sync with Cargo.toml using a non-mutating method (e.g., cargo metadata --locked / cargo check --locked) to keep CI deterministic and avoid updating dependencies during validation.
Learnt from: junhaoliao
Repo: y-scope/clp PR: 0
File: :0-0
Timestamp: 2025-10-22T21:02:31.113Z
Learning: Repository y-scope/clp: Maintain deterministic CI/builds for Rust; add a check to verify Cargo.lock is in sync with Cargo.toml without updating dependencies (non-mutating verification in clp-rust-checks workflow).

Comment thread components/log-ingestor/tests/test_compression_listener.rs

@hoophalab hoophalab left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me. There are some minor comments in terms of code.

I'm still feeling a bit uneasy about how we'll handle data persistence. Right now, we essentially have two buffers: the channel buffer and our buffer. Anything submitted to the channel buffer won't be written to disk until select! { receiver.recv()} runs.

I'm wondering if we should wrap Buffer in an arc-mutex and have the sender call Buffer::submit directly. However, this lock-based design will hurt performance. Alternatively, we could look into implementing a data-persistent channel. Or even further, we maybe should just use rabbitmq as a data-persistent channel?

Anyway, the current code is all good to be merged.

Comment thread components/log-ingestor/Cargo.toml Outdated
Comment thread components/log-ingestor/src/compression/listener.rs Outdated
Comment thread components/log-ingestor/src/compression/listener.rs
Comment thread components/log-ingestor/src/compression/buffer.rs
Comment thread components/log-ingestor/tests/test_compression_listener.rs
@LinZhihao-723

Copy link
Copy Markdown
Member Author

Looks good to me. There are some minor comments in terms of code.

I'm still feeling a bit uneasy about how we'll handle data persistence. Right now, we essentially have two buffers: the channel buffer and our buffer. Anything submitted to the channel buffer won't be written to disk until select! { receiver.recv()} runs.

I'm wondering if we should wrap Buffer in an arc-mutex and have the sender call Buffer::submit directly. However, this lock-based design will hurt performance. Alternatively, we could look into implementing a data-persistent channel. Or even further, we maybe should just use rabbitmq as a data-persistent channel?

Anyway, the current code is all good to be merged.

In our planning, we can persist the state using the following scheme:

  • The ingestion front end (scanner-based or SQS-based) will onboard the input file to our system by writing the file entry to a dedicated DB table in one transaction. This step is required (at least for the scanner-based ingestor) to recover from a failure. There should be a state column, and it is initialized to Buffered, meaning that it should be now in the buffered state.
  • The submitter will submit the buffer to create a compression job whenever a flush is triggered. Once the compression job is created, we will update the state column of submitted files to Submitted with the compression job ID. Technically, we can do this in one transaction, so it is atomic: either a job has been created with relevant files marked as Submitted, or nothing's submitted and the state is still Buffered.
  • When the compression job finishes, it needs to update the table to mark the state as Ingested.
  • When recovering from a failure:
    • Any file/object that has been onboarded by the ingestor frontend will be in state Buffered. So we can just walk through the DB table to find these entries and put them into the desired buffer.
    • If the atomic transaction mentioned above for submission is not implemented: we can still determine whether a job is an orphan job by checking whether the job ID attempts to update Ingested matches the submitted job ID. The buffer can be re-submitted and in that case the orphan job created by the previously failed ingestor won't be able to update the input file table as (1) the submitted job ID doesn't match; or (2) the state in the table is not Submitted.

That said, we don't need to explicitly persist any internal states for either the channel or the buffer. We rely on the DB update policy to ensure data persistence.

LinZhihao-723 and others added 2 commits November 6, 2025 17:40
Co-authored-by: hoophalab <200652805+hoophalab@users.noreply.github.com>
hoophalab
hoophalab previously approved these changes Nov 7, 2025

@hoophalab hoophalab left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm. one nitpick

Comment thread components/log-ingestor/src/compression/listener.rs Outdated
Co-authored-by: hoophalab <200652805+hoophalab@users.noreply.github.com>

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0743637 and 75c582e.

📒 Files selected for processing (1)
  • components/log-ingestor/src/compression/listener.rs (1 hunks)
🧰 Additional context used
🧠 Learnings (1)
📓 Common learnings
Learnt from: Bill-hbrhbr
Repo: y-scope/clp PR: 1122
File: components/core/src/clp/clp/CMakeLists.txt:175-195
Timestamp: 2025-07-23T09:54:45.185Z
Learning: In the CLP project, when reviewing CMakeLists.txt changes that introduce new compression library dependencies (BZip2, LibLZMA, LZ4, ZLIB), the team prefers to address conditional linking improvements in separate PRs rather than expanding the scope of focused migration PRs like the LibArchive task-based installation migration.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-01T07:59:11.208Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-08T15:52:50.753Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
🧬 Code graph analysis (1)
components/log-ingestor/src/compression/listener.rs (2)
components/log-ingestor/src/compression/buffer.rs (4)
  • submit (17-17)
  • submit (81-88)
  • add (59-68)
  • new (39-46)
components/log-ingestor/tests/test_compression_listener.rs (2)
  • submit (30-34)
  • new (17-21)
🔇 Additional comments (6)
components/log-ingestor/src/compression/listener.rs (6)

1-12: LGTM: Imports are appropriate and complete.

All imports are necessary for the buffered listener implementation, including Tokio async primitives, cancellation support, and local compression types.


14-22: LGTM: ListenerTask structure is well-designed.

The private task struct correctly owns the receiver to prevent concurrent access, and the generic Submitter parameter provides flexibility for different submission strategies.


24-77: LGTM: Core listener logic is correctly implemented.

The select! loop properly handles all three submission triggers (cancellation, buffer capacity via Buffer::add, and timeout). The timer reset on line 74 after each iteration correctly implements "idle timeout" semantics: the timeout counter resets after any activity (receiving metadata or timer firing), and submission occurs if no activity happens for the configured duration. The receiver closure handling returns an error as intended, enforcing the contract that shutdown_and_join must be called before senders are dropped.


79-85: LGTM: Listener structure is appropriate.

The public struct correctly holds the sender for cloning to producers, a cancellation token for graceful shutdown, and a join handle for awaiting task completion.


124-138: LGTM: Shutdown logic is correct.

The method properly triggers cancellation and waits for the task to complete, ensuring the buffer is submitted before termination. Error propagation from the task is correctly handled.

Note: Messages sent after cancel_token.cancel() is called may not be processed if the cancellation branch wins the select! race. This is standard async cancellation behaviour and acceptable given the documented shutdown semantics.


140-150: LGTM: Sender distribution is well-documented and correctly implemented.

The method safely returns a cloned sender with clear documentation about ordering and concurrency semantics. The #[must_use] attribute appropriately prevents accidental misuse.

Comment thread components/log-ingestor/src/compression/listener.rs
@LinZhihao-723 LinZhihao-723 merged commit 8a5fca2 into y-scope:main Nov 7, 2025
22 checks passed
junhaoliao pushed a commit to junhaoliao/clp that referenced this pull request May 17, 2026
…er S3 object metadata for compression job submission. (y-scope#1552)

Co-authored-by: hoophalab <200652805+hoophalab@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants