Skip to content

feat(log-ingestor): Add SqsListener to consume S3 notifications via SQS.#1602

Merged
LinZhihao-723 merged 22 commits into
y-scope:mainfrom
LinZhihao-723:sqs-listener
Nov 18, 2025
Merged

feat(log-ingestor): Add SqsListener to consume S3 notifications via SQS.#1602
LinZhihao-723 merged 22 commits into
y-scope:mainfrom
LinZhihao-723:sqs-listener

Conversation

@LinZhihao-723

@LinZhihao-723 LinZhihao-723 commented Nov 14, 2025

Copy link
Copy Markdown
Member

Description

This PR introduces SqsListener, an SQS-based ingestion job implementation. This job listens to an SQS queue that subscribes to an S3 bucket. The S3 bucket will create event notifications, while SqsListener will consume these notifications by extracting the S3 object metadata and forwarding them into a downstream receiver (which should be a compression listener introduced in #1552).

Some key decisions made:

  • In this version, we assume that the SQS passed into SqsListener is dedicated to the particular ingestion job. CLP has the permission to delete any received message after successfully processing it.
    • For now, we won't configure any DLQ service for unused messages.
  • We design a client manager trait, AwsClientManager, to adapt the need for the coming credential manager service, where we need to refresh the AWS client with temporary credentials. In the current implementation, the client manager implementation should be just a simple wrapper of the raw AWS client.

This PR also adds test cases for emulated SQS services using LocalStack. By default, the tests can be executed with the task (same as the workflow). I also updated the component's README to include instructions for running these tests manually.

Checklist

  • The PR satisfies the contribution guidelines.
  • This is a breaking change and that has been indicated in the PR title, OR this isn't a
    breaking change.
  • Necessary docs have been updated, OR no docs need to be updated.

Validation performed

  • Ensure all workflows pass.
  • Add testing on top of LocalStack to test the basic functionality.

Summary by CodeRabbit

  • New Features

    • Configurable AWS endpoint for flexible environments.
    • New SQS listener with lifecycle API to stream relevant S3 object metadata to consumers.
    • AWS client management abstraction to simplify client usage.
  • Tests

    • LocalStack-enabled integration tests and test helpers for end-to-end ingestion.
    • Test run config updated to execute ignored/integration tests.
  • Documentation

    • README expanded with Testing quick-start and manual instructions.
  • Chores

    • Internal size representation adjusted (may affect integrations).

@LinZhihao-723 LinZhihao-723 requested a review from a team as a code owner November 14, 2025 04:21
@coderabbitai

coderabbitai Bot commented Nov 14, 2025

Copy link
Copy Markdown
Contributor

Walkthrough

Adds SQS/S3 integration and tests to log-ingestor: wires an endpoint into the SQS client, changes an S3 event size field, adds a cancellable SqsListener that polls SQS and emits S3 object metadata, introduces an AWS client manager abstraction, test helpers and an integration test, and updates dependencies and README.

Changes

Cohort / File(s) Summary
SQS client change
components/clp-rust-utils/src/sqs/client.rs
Added an endpoint: &str parameter to create_new_client and wired it into the AWS config via .endpoint_url(endpoint).
Event type update
components/clp-rust-utils/src/sqs/event/s3.rs
Changed Object.size field type from u64 to usize.
Log-ingestor dependencies
components/log-ingestor/Cargo.toml
Added runtime dependencies: aws-sdk-s3, aws-sdk-sqs, secrecy (with serde feature), serde_json, uuid (v4); added dev-dependencies serial_test, stdext.
README / docs
components/log-ingestor/README.md
Added a "Testing" section with Quick Start and LocalStack/manual test instructions.
AWS client abstraction
components/log-ingestor/src/aws_client_manager.rs
New AwsClientType marker trait, AwsClientManagerType<Client> trait with async get(), and SqsClientWrapper implementation and constructor.
SQS listener implementation
components/log-ingestor/src/ingestion_job/sqs_listener.rs
New SqsListener and internal task: polling loop with cancellation and backoff, SQS ReceiveMessage/DeleteMessage handling, S3 event parsing, ObjectMetadata extraction and channel send; adds SqsListenerConfig.
Module exports
components/log-ingestor/src/ingestion_job.rs, components/log-ingestor/src/lib.rs
Declared mod sqs_listener; and pub use sqs_listener::*; and added pub mod aws_client_manager; and pub mod ingestion_job;.
Minor docs edit
components/log-ingestor/src/compression/listener.rs
Small documentation wording/format change only.
Test helpers
components/log-ingestor/tests/aws_config.rs
Added AwsConfig test helper with from_env() to load LocalStack test settings and required env var checks.
Integration test
components/log-ingestor/tests/test_ingestion_job.rs
Added an async integration test (ignored by default) that uploads S3 objects, runs SqsListener, collects metadata via channel, and asserts received vs created objects.
Test task config
taskfiles/tests/main.yaml
Replaced single cargo test call with a multi-line shell command that sets env vars and runs cargo nextest --run-ignored --release.

Sequence Diagram(s)

sequenceDiagram
    autonumber
    participant Test as Integration Test
    participant Uploader as S3 Uploader
    participant Listener as SqsListener Task
    participant SQS as SQS Service
    participant S3 as S3 Service
    participant Channel as mpsc::Sender
    participant Collector as Metadata Receiver

    Note over Test,Uploader: setup and uploads
    Test->>Uploader: spawn uploads (PutObject)
    Uploader->>S3: PutObject
    Note over Test,Listener: start listener
    Test->>Listener: SqsListener::spawn(config, sender)
    Listener->>SQS: ReceiveMessage (poll loop)
    SQS-->>Listener: messages (S3 ObjectCreated events)
    Listener->>Listener: parse & filter events
    alt relevant object
        Listener->>Channel: send ObjectMetadata
        Listener->>SQS: DeleteMessage (receipt handle)
    end
    Channel-->>Collector: deliver metadata
    Test->>Collector: gather results
    Test->>Listener: shutdown_and_join()
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Areas needing extra attention:

  • components/log-ingestor/src/ingestion_job/sqs_listener.rs — polling/backoff correctness, message parsing/filtering, deletion semantics, cancellation and task join behavior, channel backpressure.
  • components/log-ingestor/src/aws_client_manager.rs — trait bounds, async_trait usage, Send+Sync and cloning of AWS clients.
  • components/clp-rust-utils/src/sqs/client.rs — new endpoint parameter: call-site updates and endpoint URL behavior.
  • components/clp-rust-utils/src/sqs/event/s3.rs — change from u64 to usize: serde compatibility and cross-platform implications.
  • components/log-ingestor/tests/* and taskfiles/tests/main.yaml — LocalStack assumptions, test flakiness/timing, environment configuration.

Pre-merge checks and finishing touches

✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately and specifically describes the main change: adding an SqsListener component to consume S3 notifications via SQS in the log-ingestor.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

📜 Recent review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ce55415 and 6b7cc6f.

📒 Files selected for processing (1)
  • components/log-ingestor/tests/test_ingestion_job.rs (1 hunks)
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-08T15:52:50.753Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-01T07:59:11.208Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
📚 Learning: 2024-10-01T07:59:11.208Z
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-01T07:59:11.208Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.

Applied to files:

  • components/log-ingestor/tests/test_ingestion_job.rs
📚 Learning: 2024-10-10T05:46:35.188Z
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 554
File: components/core/src/clp/ffi/KeyValuePairLogEvent.cpp:299-307
Timestamp: 2024-10-10T05:46:35.188Z
Learning: In the C++ function `get_schema_subtree_bitmap` in `KeyValuePairLogEvent.cpp`, when a loop uses `while (true)` with an internal check on `optional.has_value()`, and comments explain that this structure is used to silence `clang-tidy` warnings about unchecked optional access, this code is acceptable and should not be refactored to use `while (optional.has_value())`.

Applied to files:

  • components/log-ingestor/tests/test_ingestion_job.rs
🧬 Code graph analysis (1)
components/log-ingestor/tests/test_ingestion_job.rs (4)
components/log-ingestor/tests/aws_config.rs (1)
  • from_env (47-74)
components/clp-rust-utils/src/sqs/client.rs (1)
  • create_new_client (15-36)
components/log-ingestor/src/ingestion_job/sqs_listener.rs (1)
  • spawn (187-206)
components/log-ingestor/src/aws_client_manager.rs (1)
  • from (47-49)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (14)
  • GitHub Check: centos-stream-9-dynamic-linked-bins
  • GitHub Check: centos-stream-9-static-linked-bins
  • GitHub Check: ubuntu-jammy-static-linked-bins
  • GitHub Check: ubuntu-jammy-dynamic-linked-bins
  • GitHub Check: ubuntu-jammy-lint
  • GitHub Check: manylinux_2_28-x86_64-dynamic-linked-bins
  • GitHub Check: musllinux_1_2-x86_64-static-linked-bins
  • GitHub Check: musllinux_1_2-x86_64-dynamic-linked-bins
  • GitHub Check: manylinux_2_28-x86_64-static-linked-bins
  • GitHub Check: rust-checks
  • GitHub Check: lint-check (ubuntu-24.04)
  • GitHub Check: build-macos (macos-15, false)
  • GitHub Check: build-macos (macos-15, true)
  • GitHub Check: antlr-code-committed (macos-15)
🔇 Additional comments (3)
components/log-ingestor/tests/test_ingestion_job.rs (3)

45-68: LGTM! Previous feedback addressed.

The function has been successfully simplified to return Vec<ObjectMetadata> directly, removing the unnecessary Result wrapper and the problematic unwrap(). The implementation is now clean and straightforward.


80-96: SecretString reuse implemented correctly.

The SecretString is now created once and reused for both S3 and SQS client creation, addressing previous feedback efficiently.


99-118: Well-structured test with proper isolation and assertions.

The test demonstrates good practices:

  • UUID-based prefix ensures test isolation
  • Separate relevant and irrelevant objects validate filtering
  • Proper shutdown sequence for the listener
  • Sorted comparison handles unordered async message delivery correctly

Also applies to: 128-161


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file is updated so that it's consistent with SqsListener.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between dfb4526 and aaa69db.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (12)
  • components/clp-rust-utils/src/sqs/client.rs (2 hunks)
  • components/clp-rust-utils/src/sqs/event/s3.rs (1 hunks)
  • components/log-ingestor/Cargo.toml (1 hunks)
  • components/log-ingestor/README.md (1 hunks)
  • components/log-ingestor/src/aws_client_manager.rs (1 hunks)
  • components/log-ingestor/src/compression/listener.rs (1 hunks)
  • components/log-ingestor/src/ingestion_job.rs (1 hunks)
  • components/log-ingestor/src/ingestion_job/sqs_listener.rs (1 hunks)
  • components/log-ingestor/src/lib.rs (1 hunks)
  • components/log-ingestor/tests/aws_config.rs (1 hunks)
  • components/log-ingestor/tests/test_ingestion_job.rs (1 hunks)
  • taskfiles/tests/main.yaml (1 hunks)
🧰 Additional context used
🧠 Learnings (4)
📓 Common learnings
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-01T07:59:11.208Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-08T15:52:50.753Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
📚 Learning: 2025-10-22T21:02:31.113Z
Learnt from: junhaoliao
Repo: y-scope/clp PR: 0
File: :0-0
Timestamp: 2025-10-22T21:02:31.113Z
Learning: Repository y-scope/clp: Maintain deterministic CI/builds for Rust; add a check to verify Cargo.lock is in sync with Cargo.toml without updating dependencies (non-mutating verification in clp-rust-checks workflow).

Applied to files:

  • components/log-ingestor/README.md
  • components/log-ingestor/Cargo.toml
  • taskfiles/tests/main.yaml
📚 Learning: 2025-10-22T21:01:31.391Z
Learnt from: junhaoliao
Repo: y-scope/clp PR: 0
File: :0-0
Timestamp: 2025-10-22T21:01:31.391Z
Learning: In y-scope/clp, for the clp-rust-checks workflow (GitHub Actions) and the Taskfile target deps:lock:check-rust, we should verify Rust Cargo.lock is in sync with Cargo.toml using a non-mutating method (e.g., cargo metadata --locked / cargo check --locked) to keep CI deterministic and avoid updating dependencies during validation.

Applied to files:

  • components/log-ingestor/Cargo.toml
  • taskfiles/tests/main.yaml
📚 Learning: 2025-10-20T21:05:30.417Z
Learnt from: junhaoliao
Repo: y-scope/clp PR: 1413
File: tools/docker-images/clp-package/Dockerfile:22-24
Timestamp: 2025-10-20T21:05:30.417Z
Learning: In the clp repository's Dockerfiles, ENV directives should be consolidated into multi-line ENV statements when possible to reduce image layers. ENV statements should only be split into separate commands when consolidation is not possible due to dependencies (e.g., when later variables must reference earlier ones that need to be set first, or when PATH must be modified sequentially).

Applied to files:

  • taskfiles/tests/main.yaml
🧬 Code graph analysis (2)
components/log-ingestor/tests/test_ingestion_job.rs (4)
components/log-ingestor/tests/aws_config.rs (1)
  • from_env (39-67)
components/clp-rust-utils/src/sqs/client.rs (1)
  • create_new_client (15-36)
components/log-ingestor/src/ingestion_job/sqs_listener.rs (1)
  • spawn (186-205)
components/log-ingestor/src/aws_client_manager.rs (1)
  • from (47-49)
components/log-ingestor/src/ingestion_job/sqs_listener.rs (2)
components/log-ingestor/src/compression/listener.rs (3)
  • run (41-76)
  • spawn (102-122)
  • shutdown_and_join (135-138)
components/log-ingestor/src/aws_client_manager.rs (2)
  • get (30-30)
  • get (40-42)
🪛 GitHub Actions: clp-rust-checks
components/log-ingestor/tests/aws_config.rs

[error] 35-35: Rustfmt formatting check failed: diff detected in aws_config.rs (line 35). Run 'cargo +nightly fmt --all -- --check' or 'cargo +nightly fmt --all' to fix formatting.

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (14)
  • GitHub Check: manylinux_2_28-x86_64-dynamic-linked-bins
  • GitHub Check: musllinux_1_2-x86_64-dynamic-linked-bins
  • GitHub Check: musllinux_1_2-x86_64-static-linked-bins
  • GitHub Check: ubuntu-jammy-dynamic-linked-bins
  • GitHub Check: package-image
  • GitHub Check: ubuntu-jammy-static-linked-bins
  • GitHub Check: ubuntu-jammy-lint
  • GitHub Check: manylinux_2_28-x86_64-static-linked-bins
  • GitHub Check: centos-stream-9-dynamic-linked-bins
  • GitHub Check: centos-stream-9-static-linked-bins
  • GitHub Check: build-macos (macos-15, true)
  • GitHub Check: build-macos (macos-15, false)
  • GitHub Check: lint-check (macos-15)
  • GitHub Check: lint-check (ubuntu-24.04)
🔇 Additional comments (19)
components/log-ingestor/src/compression/listener.rs (3)

88-88: Documentation improvements are correctly applied.

The grammar refinement on line 88 ("Creates and spawns" vs. "Create and spawn") aligns with Rust documentation conventions for function descriptions. The formatting update on line 96 properly uses backticks to enable intra-doc linking to the Submitter trait, improving discoverability in generated documentation.

Also applies to: 96-96


41-76: Method logic is sound and properly handles async event multiplexing.

The run() method correctly uses tokio::select! to handle three exit/continuation paths. Timer reset on line 74 is appropriately placed outside the match arms so it executes only when the loop continues (after metadata receipt or timeout), not when early returns occur (cancellation or channel closure). The timeout interval resets consistently after each significant event, ensuring predictable buffering behaviour.


102-151: Public API design is well-structured and properly documented.

The spawn(), get_new_sender(), and shutdown_and_join() methods present a clear, idiomatic async API. The use of #[must_use] on line 147 prevents subtle bugs where a caller might forget to store the sender. Cloning the cancellation token before task spawn (lines 113–114) correctly prevents the token from being invalidated prematurely.

taskfiles/tests/main.yaml (1)

32-36: Le commentaire d'examen repose sur des hypothèses incorrectes — aucune modification requise.

Le test d'ingestion est correctement marqué avec #[ignore = "Requires LocalStack or AWS environment"] dans components/log-ingestor/tests/test_ingestion_job.rs. La configuration LocalStack du fichier de tâche (lignes 32–36) injecte les variables d'environnement requises (CLP_LOG_INGESTOR_S3_BUCKET, CLP_LOG_INGESTOR_SQS_QUEUE) et exécute cargo test --include-ignored, ce qui est la configuration appropriée pour exécuter ce test ignoré. L'orchestration est correcte et intentionnelle.

Bien que la vérification de la synchronisation de Cargo.lock soit mentionnée dans les directives du dépôt, elle s'adresse aux cibles de vérification dédiées (flux de travail clp-rust-checks, cible deps:lock:check-rust), et non aux tâches de test générales. La modification proposée est déjà correctement implémentée.

Likely an incorrect or invalid review comment.

components/log-ingestor/Cargo.toml (2)

9-16: LGTM!

The AWS SDK dependencies, secrecy, serde_json, and uuid additions are appropriate for the SQS listener implementation and S3 integration.


18-20: LGTM!

The test dependencies serial_test and stdext are appropriate for the LocalStack-based integration tests.

components/clp-rust-utils/src/sqs/client.rs (1)

15-36: LGTM!

The endpoint parameter addition enables custom SQS endpoints for testing with LocalStack. The implementation correctly wires the endpoint into the AWS config builder.

components/log-ingestor/src/aws_client_manager.rs (3)

6-11: LGTM!

The marker trait pattern appropriately constrains generic types for AWS client management.


13-31: LGTM!

The AwsClientManagerType trait provides a clean abstraction that can support different client management strategies (singleton, auto-renew, etc.) as mentioned in the PR objectives.


33-50: LGTM!

The SqsClientWrapper provides a simple implementation that clones the client on each get() call. The const fn constructor is appropriate for this simple wrapper.

components/log-ingestor/src/lib.rs (1)

1-3: LGTM!

The new public module declarations appropriately expose the AWS client manager abstraction and the SQS listener functionality.

components/log-ingestor/src/ingestion_job.rs (1)

1-3: LGTM!

The module structure with private submodule and public re-export is a standard pattern for controlling API surface while maintaining internal organization flexibility.

components/log-ingestor/tests/aws_config.rs (1)

1-17: LGTM!

The default AWS configuration constants and AwsConfig struct are well-organized for LocalStack-based testing.

components/clp-rust-utils/src/sqs/event/s3.rs (1)

37-37: Clarify whether 32-bit platform support is a project requirement.

The concern about platform dependency is theoretically valid—changing from u64 to usize does introduce architecture-specific limits. However, no 32-bit target configurations were found in the project, and AWS SDKs for Rust technically support 32-bit targets (i686, armv7) only if the project explicitly targets them.

Verify: Is this codebase intended to support 32-bit platforms? If the project targets 64-bit only (as current configuration suggests), the change is acceptable. If 32-bit support is required, u64 should be retained to handle S3 objects up to 5 TB reliably.

components/log-ingestor/tests/test_ingestion_job.rs (2)

12-41: S3 object creation helper looks correct and aligns with ObjectMetadata

The helper correctly uses object.size to build the dummy body and forwards S3 put_object().send() errors via ?, which is appropriate for an integration test. No issues from a correctness or robustness standpoint here.


88-175: Existing documentation already addresses LocalStack and endpoint expectations; test logic is sound

The README.md provides clear guidance on LocalStack setup, including endpoint configuration and environment variables. The aws_config.rs file further documents the endpoint format and configuration options. The queue URL construction format!("{}/{}/{}", endpoint, account_id, queue_name) correctly produces the expected LocalStack queue URL format (e.g., http://127.0.0.1:4566/000000000000/queue_name), and this is consistent with the documented DEFAULT_AWS_ENDPOINT.

The test flakiness concern with 100 objects, batch size 2, and 30-second timeout is noted as acceptable for an integration test marked #[ignore], which requires explicit manual setup and execution in a controlled environment.

No blocking issues identified. The test structure and logic are sound, and the AWS configuration expectations are already well-documented.

components/log-ingestor/src/ingestion_job/sqs_listener.rs (3)

133-162: Verification confirms metadata extraction logic is correct

All field names and types in the Record, Entity, Bucket, and Object structs align perfectly with the usage in extract_object_metadata and is_relevant_object. The code correctly accesses nested fields (record.s3.bucket.name, record.s3.object.key, record.s3.object.size), and the method calls on String fields (.as_str(), .starts_with(), .ends_with()) are all valid. The filtering logic and ObjectMetadata mapping are sound.


52-76: The get() implementation is already cheap—no changes required.

The current SqsClientWrapper::get() implementation simply clones the client:

async fn get(&self) -> Result<SqsClient> {
    Ok(self.client.clone())
}

Since this is a trivial, non-blocking operation with no credential refresh or client rebuilding, the review comment's own condition ("If you intend get() to remain cheap, this is acceptable") is already met. The existing code structure is acceptable as written.


165-221: Code structure and lifecycle semantics are consistent with the compression listener pattern

The SqsListener lifecycle implementation matches the compression listener: spawn initializes the cancellation token and spawns the underlying task, shutdown_and_join cancels then awaits the handle, and get_id exposes the UUID. The shutdown semantics are identical, providing a consistent API for orchestrating listener lifecycles.

Comment thread components/log-ingestor/README.md Outdated
Comment thread components/log-ingestor/README.md Outdated
Comment thread components/log-ingestor/src/ingestion_job/sqs_listener.rs
Comment thread components/log-ingestor/src/ingestion_job/sqs_listener.rs
Comment thread components/log-ingestor/tests/aws_config.rs
Comment thread components/log-ingestor/tests/test_ingestion_job.rs Outdated

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (1)
components/log-ingestor/tests/test_ingestion_job.rs (1)

70-71: Handle the case where checked_sub returns None.

Although the guard at lines 66-68 checks for timeout, time can advance between that check and line 71, potentially causing checked_sub to return None and trigger a panic. Since you acknowledged fixing this in a previous review, please replace the unwrap() with explicit handling.

Apply this diff:

-        match tokio::time::timeout(
-            timeout_duration.checked_sub(elapsed).unwrap(),
-            receiver.recv(),
-        )
+        let remaining = timeout_duration.checked_sub(elapsed).unwrap_or(tokio::time::Duration::ZERO);
+        match tokio::time::timeout(
+            remaining,
+            receiver.recv(),
+        )
📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f47b8ca and 42d5009.

📒 Files selected for processing (2)
  • components/log-ingestor/README.md (1 hunks)
  • components/log-ingestor/tests/test_ingestion_job.rs (1 hunks)
🧰 Additional context used
🧠 Learnings (5)
📓 Common learnings
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-08T15:52:50.753Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-01T07:59:11.208Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
📚 Learning: 2025-08-08T21:15:10.905Z
Learnt from: haiqi96
Repo: y-scope/clp PR: 1100
File: integration-tests/tests/test_identity_transformation.py:87-95
Timestamp: 2025-08-08T21:15:10.905Z
Learning: In the CLP project's integration tests (Python code), variable names should use "logs" (plural) rather than "log" (singular) when referring to test logs or log-related entities, as this aligns with the naming conventions used throughout the codebase.

Applied to files:

  • components/log-ingestor/README.md
📚 Learning: 2025-10-22T21:02:31.113Z
Learnt from: junhaoliao
Repo: y-scope/clp PR: 0
File: :0-0
Timestamp: 2025-10-22T21:02:31.113Z
Learning: Repository y-scope/clp: Maintain deterministic CI/builds for Rust; add a check to verify Cargo.lock is in sync with Cargo.toml without updating dependencies (non-mutating verification in clp-rust-checks workflow).

Applied to files:

  • components/log-ingestor/README.md
📚 Learning: 2024-10-01T07:59:11.208Z
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-01T07:59:11.208Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.

Applied to files:

  • components/log-ingestor/tests/test_ingestion_job.rs
📚 Learning: 2024-10-10T05:46:35.188Z
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 554
File: components/core/src/clp/ffi/KeyValuePairLogEvent.cpp:299-307
Timestamp: 2024-10-10T05:46:35.188Z
Learning: In the C++ function `get_schema_subtree_bitmap` in `KeyValuePairLogEvent.cpp`, when a loop uses `while (true)` with an internal check on `optional.has_value()`, and comments explain that this structure is used to silence `clang-tidy` warnings about unchecked optional access, this code is acceptable and should not be refactored to use `while (optional.has_value())`.

Applied to files:

  • components/log-ingestor/tests/test_ingestion_job.rs
🧬 Code graph analysis (1)
components/log-ingestor/tests/test_ingestion_job.rs (4)
components/log-ingestor/tests/aws_config.rs (1)
  • from_env (39-67)
components/clp-rust-utils/src/sqs/client.rs (1)
  • create_new_client (15-36)
components/log-ingestor/src/ingestion_job/sqs_listener.rs (1)
  • spawn (186-205)
components/log-ingestor/src/aws_client_manager.rs (1)
  • from (47-49)

Comment thread components/log-ingestor/tests/test_ingestion_job.rs
Comment thread components/log-ingestor/tests/test_ingestion_job.rs

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 42d5009 and f194d37.

📒 Files selected for processing (2)
  • components/log-ingestor/tests/aws_config.rs (1 hunks)
  • taskfiles/tests/main.yaml (1 hunks)
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-08T15:52:50.753Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-01T07:59:11.208Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
📚 Learning: 2025-10-22T21:02:31.113Z
Learnt from: junhaoliao
Repo: y-scope/clp PR: 0
File: :0-0
Timestamp: 2025-10-22T21:02:31.113Z
Learning: Repository y-scope/clp: Maintain deterministic CI/builds for Rust; add a check to verify Cargo.lock is in sync with Cargo.toml without updating dependencies (non-mutating verification in clp-rust-checks workflow).

Applied to files:

  • taskfiles/tests/main.yaml
📚 Learning: 2025-10-22T21:01:31.391Z
Learnt from: junhaoliao
Repo: y-scope/clp PR: 0
File: :0-0
Timestamp: 2025-10-22T21:01:31.391Z
Learning: In y-scope/clp, for the clp-rust-checks workflow (GitHub Actions) and the Taskfile target deps:lock:check-rust, we should verify Rust Cargo.lock is in sync with Cargo.toml using a non-mutating method (e.g., cargo metadata --locked / cargo check --locked) to keep CI deterministic and avoid updating dependencies during validation.

Applied to files:

  • taskfiles/tests/main.yaml
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (15)
  • GitHub Check: musllinux_1_2-x86_64-static-linked-bins
  • GitHub Check: musllinux_1_2-x86_64-dynamic-linked-bins
  • GitHub Check: centos-stream-9-dynamic-linked-bins
  • GitHub Check: ubuntu-jammy-lint
  • GitHub Check: package-image
  • GitHub Check: ubuntu-jammy-dynamic-linked-bins
  • GitHub Check: centos-stream-9-static-linked-bins
  • GitHub Check: ubuntu-jammy-static-linked-bins
  • GitHub Check: manylinux_2_28-x86_64-dynamic-linked-bins
  • GitHub Check: manylinux_2_28-x86_64-static-linked-bins
  • GitHub Check: build-macos (macos-15, false)
  • GitHub Check: build-macos (macos-15, true)
  • GitHub Check: lint-check (ubuntu-24.04)
  • GitHub Check: lint-check (macos-15)
  • GitHub Check: rust-checks

Comment thread components/log-ingestor/tests/aws_config.rs Outdated
Comment thread taskfiles/tests/main.yaml Outdated

@hoophalab hoophalab left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm in general. There are some minor comments.

Comment thread components/log-ingestor/src/ingestion_job/sqs_listener.rs Outdated
Comment thread components/log-ingestor/src/ingestion_job/sqs_listener.rs Outdated
Comment thread components/log-ingestor/src/ingestion_job/sqs_listener.rs Outdated
Comment thread components/log-ingestor/src/ingestion_job/sqs_listener.rs
Comment thread components/log-ingestor/src/ingestion_job/sqs_listener.rs
Comment thread components/log-ingestor/tests/test_ingestion_job.rs Outdated
Comment thread components/log-ingestor/src/ingestion_job/sqs_listener.rs Outdated
Comment thread components/log-ingestor/tests/test_ingestion_job.rs
Comment thread components/log-ingestor/tests/aws_config.rs Outdated
Comment thread components/log-ingestor/src/ingestion_job/sqs_listener.rs

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7f5d9d1 and ce55415.

📒 Files selected for processing (1)
  • components/log-ingestor/tests/test_ingestion_job.rs (1 hunks)
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-08T15:52:50.753Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-01T07:59:11.208Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
📚 Learning: 2024-10-01T07:59:11.208Z
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-01T07:59:11.208Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.

Applied to files:

  • components/log-ingestor/tests/test_ingestion_job.rs
📚 Learning: 2024-10-10T05:46:35.188Z
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 554
File: components/core/src/clp/ffi/KeyValuePairLogEvent.cpp:299-307
Timestamp: 2024-10-10T05:46:35.188Z
Learning: In the C++ function `get_schema_subtree_bitmap` in `KeyValuePairLogEvent.cpp`, when a loop uses `while (true)` with an internal check on `optional.has_value()`, and comments explain that this structure is used to silence `clang-tidy` warnings about unchecked optional access, this code is acceptable and should not be refactored to use `while (optional.has_value())`.

Applied to files:

  • components/log-ingestor/tests/test_ingestion_job.rs
🧬 Code graph analysis (1)
components/log-ingestor/tests/test_ingestion_job.rs (4)
components/log-ingestor/tests/aws_config.rs (1)
  • from_env (47-74)
components/clp-rust-utils/src/sqs/client.rs (1)
  • create_new_client (15-36)
components/log-ingestor/src/ingestion_job/sqs_listener.rs (1)
  • spawn (187-206)
components/log-ingestor/src/aws_client_manager.rs (1)
  • from (47-49)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: manylinux_2_28-x86_64-dynamic-linked-bins
  • GitHub Check: manylinux_2_28-x86_64-static-linked-bins
  • GitHub Check: lint-check (ubuntu-24.04)
  • GitHub Check: rust-checks
🔇 Additional comments (2)
components/log-ingestor/tests/test_ingestion_job.rs (2)

50-68: Good simplification of the helper function.

The function now returns Vec<ObjectMetadata> directly instead of wrapping it in a Result, which correctly reflects that it doesn't produce errors. The doc comment accurately describes the behavior, and the timeout handling has been appropriately moved to the call site.


124-158: Well-structured integration test.

The test effectively validates the SQS listener's ability to:

  • Filter objects by prefix (using irrelevant_objects as negative test cases)
  • Receive all expected object metadata
  • Handle concurrent operations correctly

The use of tokio::time::timeout at the call site (rather than inside the helper) provides clearer timeout semantics and addresses previous feedback.

Comment thread components/log-ingestor/tests/test_ingestion_job.rs Outdated

@hoophalab hoophalab left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@LinZhihao-723 LinZhihao-723 merged commit 6bf138f into y-scope:main Nov 18, 2025
27 checks passed
junhaoliao pushed a commit to junhaoliao/clp that referenced this pull request May 17, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants