feat(log-ingestor): Add SqsListener to consume S3 notifications via SQS.#1602
Conversation
WalkthroughAdds SQS/S3 integration and tests to log-ingestor: wires an endpoint into the SQS client, changes an S3 event size field, adds a cancellable SqsListener that polls SQS and emits S3 object metadata, introduces an AWS client manager abstraction, test helpers and an integration test, and updates dependencies and README. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Test as Integration Test
participant Uploader as S3 Uploader
participant Listener as SqsListener Task
participant SQS as SQS Service
participant S3 as S3 Service
participant Channel as mpsc::Sender
participant Collector as Metadata Receiver
Note over Test,Uploader: setup and uploads
Test->>Uploader: spawn uploads (PutObject)
Uploader->>S3: PutObject
Note over Test,Listener: start listener
Test->>Listener: SqsListener::spawn(config, sender)
Listener->>SQS: ReceiveMessage (poll loop)
SQS-->>Listener: messages (S3 ObjectCreated events)
Listener->>Listener: parse & filter events
alt relevant object
Listener->>Channel: send ObjectMetadata
Listener->>SQS: DeleteMessage (receipt handle)
end
Channel-->>Collector: deliver metadata
Test->>Collector: gather results
Test->>Listener: shutdown_and_join()
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Areas needing extra attention:
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: ASSERTIVE Plan: Pro 📒 Files selected for processing (1)
🧰 Additional context used🧠 Learnings (3)📓 Common learnings📚 Learning: 2024-10-01T07:59:11.208ZApplied to files:
📚 Learning: 2024-10-10T05:46:35.188ZApplied to files:
🧬 Code graph analysis (1)components/log-ingestor/tests/test_ingestion_job.rs (4)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (14)
🔇 Additional comments (3)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
This file is updated so that it's consistent with SqsListener.
There was a problem hiding this comment.
Actionable comments posted: 6
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (12)
components/clp-rust-utils/src/sqs/client.rs(2 hunks)components/clp-rust-utils/src/sqs/event/s3.rs(1 hunks)components/log-ingestor/Cargo.toml(1 hunks)components/log-ingestor/README.md(1 hunks)components/log-ingestor/src/aws_client_manager.rs(1 hunks)components/log-ingestor/src/compression/listener.rs(1 hunks)components/log-ingestor/src/ingestion_job.rs(1 hunks)components/log-ingestor/src/ingestion_job/sqs_listener.rs(1 hunks)components/log-ingestor/src/lib.rs(1 hunks)components/log-ingestor/tests/aws_config.rs(1 hunks)components/log-ingestor/tests/test_ingestion_job.rs(1 hunks)taskfiles/tests/main.yaml(1 hunks)
🧰 Additional context used
🧠 Learnings (4)
📓 Common learnings
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-01T07:59:11.208Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-08T15:52:50.753Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
📚 Learning: 2025-10-22T21:02:31.113Z
Learnt from: junhaoliao
Repo: y-scope/clp PR: 0
File: :0-0
Timestamp: 2025-10-22T21:02:31.113Z
Learning: Repository y-scope/clp: Maintain deterministic CI/builds for Rust; add a check to verify Cargo.lock is in sync with Cargo.toml without updating dependencies (non-mutating verification in clp-rust-checks workflow).
Applied to files:
components/log-ingestor/README.mdcomponents/log-ingestor/Cargo.tomltaskfiles/tests/main.yaml
📚 Learning: 2025-10-22T21:01:31.391Z
Learnt from: junhaoliao
Repo: y-scope/clp PR: 0
File: :0-0
Timestamp: 2025-10-22T21:01:31.391Z
Learning: In y-scope/clp, for the clp-rust-checks workflow (GitHub Actions) and the Taskfile target deps:lock:check-rust, we should verify Rust Cargo.lock is in sync with Cargo.toml using a non-mutating method (e.g., cargo metadata --locked / cargo check --locked) to keep CI deterministic and avoid updating dependencies during validation.
Applied to files:
components/log-ingestor/Cargo.tomltaskfiles/tests/main.yaml
📚 Learning: 2025-10-20T21:05:30.417Z
Learnt from: junhaoliao
Repo: y-scope/clp PR: 1413
File: tools/docker-images/clp-package/Dockerfile:22-24
Timestamp: 2025-10-20T21:05:30.417Z
Learning: In the clp repository's Dockerfiles, ENV directives should be consolidated into multi-line ENV statements when possible to reduce image layers. ENV statements should only be split into separate commands when consolidation is not possible due to dependencies (e.g., when later variables must reference earlier ones that need to be set first, or when PATH must be modified sequentially).
Applied to files:
taskfiles/tests/main.yaml
🧬 Code graph analysis (2)
components/log-ingestor/tests/test_ingestion_job.rs (4)
components/log-ingestor/tests/aws_config.rs (1)
from_env(39-67)components/clp-rust-utils/src/sqs/client.rs (1)
create_new_client(15-36)components/log-ingestor/src/ingestion_job/sqs_listener.rs (1)
spawn(186-205)components/log-ingestor/src/aws_client_manager.rs (1)
from(47-49)
components/log-ingestor/src/ingestion_job/sqs_listener.rs (2)
components/log-ingestor/src/compression/listener.rs (3)
run(41-76)spawn(102-122)shutdown_and_join(135-138)components/log-ingestor/src/aws_client_manager.rs (2)
get(30-30)get(40-42)
🪛 GitHub Actions: clp-rust-checks
components/log-ingestor/tests/aws_config.rs
[error] 35-35: Rustfmt formatting check failed: diff detected in aws_config.rs (line 35). Run 'cargo +nightly fmt --all -- --check' or 'cargo +nightly fmt --all' to fix formatting.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (14)
- GitHub Check: manylinux_2_28-x86_64-dynamic-linked-bins
- GitHub Check: musllinux_1_2-x86_64-dynamic-linked-bins
- GitHub Check: musllinux_1_2-x86_64-static-linked-bins
- GitHub Check: ubuntu-jammy-dynamic-linked-bins
- GitHub Check: package-image
- GitHub Check: ubuntu-jammy-static-linked-bins
- GitHub Check: ubuntu-jammy-lint
- GitHub Check: manylinux_2_28-x86_64-static-linked-bins
- GitHub Check: centos-stream-9-dynamic-linked-bins
- GitHub Check: centos-stream-9-static-linked-bins
- GitHub Check: build-macos (macos-15, true)
- GitHub Check: build-macos (macos-15, false)
- GitHub Check: lint-check (macos-15)
- GitHub Check: lint-check (ubuntu-24.04)
🔇 Additional comments (19)
components/log-ingestor/src/compression/listener.rs (3)
88-88: Documentation improvements are correctly applied.The grammar refinement on line 88 ("Creates and spawns" vs. "Create and spawn") aligns with Rust documentation conventions for function descriptions. The formatting update on line 96 properly uses backticks to enable intra-doc linking to the
Submittertrait, improving discoverability in generated documentation.Also applies to: 96-96
41-76: Method logic is sound and properly handles async event multiplexing.The
run()method correctly usestokio::select!to handle three exit/continuation paths. Timer reset on line 74 is appropriately placed outside the match arms so it executes only when the loop continues (after metadata receipt or timeout), not when early returns occur (cancellation or channel closure). The timeout interval resets consistently after each significant event, ensuring predictable buffering behaviour.
102-151: Public API design is well-structured and properly documented.The
spawn(),get_new_sender(), andshutdown_and_join()methods present a clear, idiomatic async API. The use of#[must_use]on line 147 prevents subtle bugs where a caller might forget to store the sender. Cloning the cancellation token before task spawn (lines 113–114) correctly prevents the token from being invalidated prematurely.taskfiles/tests/main.yaml (1)
32-36: Le commentaire d'examen repose sur des hypothèses incorrectes — aucune modification requise.Le test d'ingestion est correctement marqué avec
#[ignore = "Requires LocalStack or AWS environment"]danscomponents/log-ingestor/tests/test_ingestion_job.rs. La configuration LocalStack du fichier de tâche (lignes 32–36) injecte les variables d'environnement requises (CLP_LOG_INGESTOR_S3_BUCKET,CLP_LOG_INGESTOR_SQS_QUEUE) et exécutecargo test --include-ignored, ce qui est la configuration appropriée pour exécuter ce test ignoré. L'orchestration est correcte et intentionnelle.Bien que la vérification de la synchronisation de Cargo.lock soit mentionnée dans les directives du dépôt, elle s'adresse aux cibles de vérification dédiées (flux de travail
clp-rust-checks, cibledeps:lock:check-rust), et non aux tâches de test générales. La modification proposée est déjà correctement implémentée.Likely an incorrect or invalid review comment.
components/log-ingestor/Cargo.toml (2)
9-16: LGTM!The AWS SDK dependencies, secrecy, serde_json, and uuid additions are appropriate for the SQS listener implementation and S3 integration.
18-20: LGTM!The test dependencies serial_test and stdext are appropriate for the LocalStack-based integration tests.
components/clp-rust-utils/src/sqs/client.rs (1)
15-36: LGTM!The endpoint parameter addition enables custom SQS endpoints for testing with LocalStack. The implementation correctly wires the endpoint into the AWS config builder.
components/log-ingestor/src/aws_client_manager.rs (3)
6-11: LGTM!The marker trait pattern appropriately constrains generic types for AWS client management.
13-31: LGTM!The
AwsClientManagerTypetrait provides a clean abstraction that can support different client management strategies (singleton, auto-renew, etc.) as mentioned in the PR objectives.
33-50: LGTM!The
SqsClientWrapperprovides a simple implementation that clones the client on eachget()call. Theconst fnconstructor is appropriate for this simple wrapper.components/log-ingestor/src/lib.rs (1)
1-3: LGTM!The new public module declarations appropriately expose the AWS client manager abstraction and the SQS listener functionality.
components/log-ingestor/src/ingestion_job.rs (1)
1-3: LGTM!The module structure with private submodule and public re-export is a standard pattern for controlling API surface while maintaining internal organization flexibility.
components/log-ingestor/tests/aws_config.rs (1)
1-17: LGTM!The default AWS configuration constants and
AwsConfigstruct are well-organized for LocalStack-based testing.components/clp-rust-utils/src/sqs/event/s3.rs (1)
37-37: Clarify whether 32-bit platform support is a project requirement.The concern about platform dependency is theoretically valid—changing from
u64tousizedoes introduce architecture-specific limits. However, no 32-bit target configurations were found in the project, and AWS SDKs for Rust technically support 32-bit targets (i686, armv7) only if the project explicitly targets them.Verify: Is this codebase intended to support 32-bit platforms? If the project targets 64-bit only (as current configuration suggests), the change is acceptable. If 32-bit support is required,
u64should be retained to handle S3 objects up to 5 TB reliably.components/log-ingestor/tests/test_ingestion_job.rs (2)
12-41: S3 object creation helper looks correct and aligns with ObjectMetadataThe helper correctly uses
object.sizeto build the dummy body and forwards S3put_object().send()errors via?, which is appropriate for an integration test. No issues from a correctness or robustness standpoint here.
88-175: Existing documentation already addresses LocalStack and endpoint expectations; test logic is soundThe README.md provides clear guidance on LocalStack setup, including endpoint configuration and environment variables. The aws_config.rs file further documents the endpoint format and configuration options. The queue URL construction
format!("{}/{}/{}", endpoint, account_id, queue_name)correctly produces the expected LocalStack queue URL format (e.g.,http://127.0.0.1:4566/000000000000/queue_name), and this is consistent with the documented DEFAULT_AWS_ENDPOINT.The test flakiness concern with 100 objects, batch size 2, and 30-second timeout is noted as acceptable for an integration test marked
#[ignore], which requires explicit manual setup and execution in a controlled environment.No blocking issues identified. The test structure and logic are sound, and the AWS configuration expectations are already well-documented.
components/log-ingestor/src/ingestion_job/sqs_listener.rs (3)
133-162: Verification confirms metadata extraction logic is correctAll field names and types in the
Record,Entity,Bucket, andObjectstructs align perfectly with the usage inextract_object_metadataandis_relevant_object. The code correctly accesses nested fields (record.s3.bucket.name,record.s3.object.key,record.s3.object.size), and the method calls onStringfields (.as_str(),.starts_with(),.ends_with()) are all valid. The filtering logic andObjectMetadatamapping are sound.
52-76: Theget()implementation is already cheap—no changes required.The current
SqsClientWrapper::get()implementation simply clones the client:async fn get(&self) -> Result<SqsClient> { Ok(self.client.clone()) }Since this is a trivial, non-blocking operation with no credential refresh or client rebuilding, the review comment's own condition ("If you intend
get()to remain cheap, this is acceptable") is already met. The existing code structure is acceptable as written.
165-221: Code structure and lifecycle semantics are consistent with the compression listener patternThe
SqsListenerlifecycle implementation matches the compression listener:spawninitializes the cancellation token and spawns the underlying task,shutdown_and_joincancels then awaits the handle, andget_idexposes the UUID. The shutdown semantics are identical, providing a consistent API for orchestrating listener lifecycles.
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (1)
components/log-ingestor/tests/test_ingestion_job.rs (1)
70-71: Handle the case wherechecked_subreturnsNone.Although the guard at lines 66-68 checks for timeout, time can advance between that check and line 71, potentially causing
checked_subto returnNoneand trigger a panic. Since you acknowledged fixing this in a previous review, please replace theunwrap()with explicit handling.Apply this diff:
- match tokio::time::timeout( - timeout_duration.checked_sub(elapsed).unwrap(), - receiver.recv(), - ) + let remaining = timeout_duration.checked_sub(elapsed).unwrap_or(tokio::time::Duration::ZERO); + match tokio::time::timeout( + remaining, + receiver.recv(), + )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (2)
components/log-ingestor/README.md(1 hunks)components/log-ingestor/tests/test_ingestion_job.rs(1 hunks)
🧰 Additional context used
🧠 Learnings (5)
📓 Common learnings
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-08T15:52:50.753Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-01T07:59:11.208Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
📚 Learning: 2025-08-08T21:15:10.905Z
Learnt from: haiqi96
Repo: y-scope/clp PR: 1100
File: integration-tests/tests/test_identity_transformation.py:87-95
Timestamp: 2025-08-08T21:15:10.905Z
Learning: In the CLP project's integration tests (Python code), variable names should use "logs" (plural) rather than "log" (singular) when referring to test logs or log-related entities, as this aligns with the naming conventions used throughout the codebase.
Applied to files:
components/log-ingestor/README.md
📚 Learning: 2025-10-22T21:02:31.113Z
Learnt from: junhaoliao
Repo: y-scope/clp PR: 0
File: :0-0
Timestamp: 2025-10-22T21:02:31.113Z
Learning: Repository y-scope/clp: Maintain deterministic CI/builds for Rust; add a check to verify Cargo.lock is in sync with Cargo.toml without updating dependencies (non-mutating verification in clp-rust-checks workflow).
Applied to files:
components/log-ingestor/README.md
📚 Learning: 2024-10-01T07:59:11.208Z
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-01T07:59:11.208Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
Applied to files:
components/log-ingestor/tests/test_ingestion_job.rs
📚 Learning: 2024-10-10T05:46:35.188Z
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 554
File: components/core/src/clp/ffi/KeyValuePairLogEvent.cpp:299-307
Timestamp: 2024-10-10T05:46:35.188Z
Learning: In the C++ function `get_schema_subtree_bitmap` in `KeyValuePairLogEvent.cpp`, when a loop uses `while (true)` with an internal check on `optional.has_value()`, and comments explain that this structure is used to silence `clang-tidy` warnings about unchecked optional access, this code is acceptable and should not be refactored to use `while (optional.has_value())`.
Applied to files:
components/log-ingestor/tests/test_ingestion_job.rs
🧬 Code graph analysis (1)
components/log-ingestor/tests/test_ingestion_job.rs (4)
components/log-ingestor/tests/aws_config.rs (1)
from_env(39-67)components/clp-rust-utils/src/sqs/client.rs (1)
create_new_client(15-36)components/log-ingestor/src/ingestion_job/sqs_listener.rs (1)
spawn(186-205)components/log-ingestor/src/aws_client_manager.rs (1)
from(47-49)
There was a problem hiding this comment.
Actionable comments posted: 2
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (2)
components/log-ingestor/tests/aws_config.rs(1 hunks)taskfiles/tests/main.yaml(1 hunks)
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-08T15:52:50.753Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-01T07:59:11.208Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
📚 Learning: 2025-10-22T21:02:31.113Z
Learnt from: junhaoliao
Repo: y-scope/clp PR: 0
File: :0-0
Timestamp: 2025-10-22T21:02:31.113Z
Learning: Repository y-scope/clp: Maintain deterministic CI/builds for Rust; add a check to verify Cargo.lock is in sync with Cargo.toml without updating dependencies (non-mutating verification in clp-rust-checks workflow).
Applied to files:
taskfiles/tests/main.yaml
📚 Learning: 2025-10-22T21:01:31.391Z
Learnt from: junhaoliao
Repo: y-scope/clp PR: 0
File: :0-0
Timestamp: 2025-10-22T21:01:31.391Z
Learning: In y-scope/clp, for the clp-rust-checks workflow (GitHub Actions) and the Taskfile target deps:lock:check-rust, we should verify Rust Cargo.lock is in sync with Cargo.toml using a non-mutating method (e.g., cargo metadata --locked / cargo check --locked) to keep CI deterministic and avoid updating dependencies during validation.
Applied to files:
taskfiles/tests/main.yaml
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (15)
- GitHub Check: musllinux_1_2-x86_64-static-linked-bins
- GitHub Check: musllinux_1_2-x86_64-dynamic-linked-bins
- GitHub Check: centos-stream-9-dynamic-linked-bins
- GitHub Check: ubuntu-jammy-lint
- GitHub Check: package-image
- GitHub Check: ubuntu-jammy-dynamic-linked-bins
- GitHub Check: centos-stream-9-static-linked-bins
- GitHub Check: ubuntu-jammy-static-linked-bins
- GitHub Check: manylinux_2_28-x86_64-dynamic-linked-bins
- GitHub Check: manylinux_2_28-x86_64-static-linked-bins
- GitHub Check: build-macos (macos-15, false)
- GitHub Check: build-macos (macos-15, true)
- GitHub Check: lint-check (ubuntu-24.04)
- GitHub Check: lint-check (macos-15)
- GitHub Check: rust-checks
hoophalab
left a comment
There was a problem hiding this comment.
lgtm in general. There are some minor comments.
There was a problem hiding this comment.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (1)
components/log-ingestor/tests/test_ingestion_job.rs(1 hunks)
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-08T15:52:50.753Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-01T07:59:11.208Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
📚 Learning: 2024-10-01T07:59:11.208Z
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-01T07:59:11.208Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
Applied to files:
components/log-ingestor/tests/test_ingestion_job.rs
📚 Learning: 2024-10-10T05:46:35.188Z
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 554
File: components/core/src/clp/ffi/KeyValuePairLogEvent.cpp:299-307
Timestamp: 2024-10-10T05:46:35.188Z
Learning: In the C++ function `get_schema_subtree_bitmap` in `KeyValuePairLogEvent.cpp`, when a loop uses `while (true)` with an internal check on `optional.has_value()`, and comments explain that this structure is used to silence `clang-tidy` warnings about unchecked optional access, this code is acceptable and should not be refactored to use `while (optional.has_value())`.
Applied to files:
components/log-ingestor/tests/test_ingestion_job.rs
🧬 Code graph analysis (1)
components/log-ingestor/tests/test_ingestion_job.rs (4)
components/log-ingestor/tests/aws_config.rs (1)
from_env(47-74)components/clp-rust-utils/src/sqs/client.rs (1)
create_new_client(15-36)components/log-ingestor/src/ingestion_job/sqs_listener.rs (1)
spawn(187-206)components/log-ingestor/src/aws_client_manager.rs (1)
from(47-49)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: manylinux_2_28-x86_64-dynamic-linked-bins
- GitHub Check: manylinux_2_28-x86_64-static-linked-bins
- GitHub Check: lint-check (ubuntu-24.04)
- GitHub Check: rust-checks
🔇 Additional comments (2)
components/log-ingestor/tests/test_ingestion_job.rs (2)
50-68: Good simplification of the helper function.The function now returns
Vec<ObjectMetadata>directly instead of wrapping it in aResult, which correctly reflects that it doesn't produce errors. The doc comment accurately describes the behavior, and the timeout handling has been appropriately moved to the call site.
124-158: Well-structured integration test.The test effectively validates the SQS listener's ability to:
- Filter objects by prefix (using
irrelevant_objectsas negative test cases)- Receive all expected object metadata
- Handle concurrent operations correctly
The use of
tokio::time::timeoutat the call site (rather than inside the helper) provides clearer timeout semantics and addresses previous feedback.
Description
This PR introduces
SqsListener, an SQS-based ingestion job implementation. This job listens to an SQS queue that subscribes to an S3 bucket. The S3 bucket will create event notifications, whileSqsListenerwill consume these notifications by extracting the S3 object metadata and forwarding them into a downstream receiver (which should be a compression listener introduced in #1552).Some key decisions made:
SqsListeneris dedicated to the particular ingestion job. CLP has the permission to delete any received message after successfully processing it.AwsClientManager, to adapt the need for the coming credential manager service, where we need to refresh the AWS client with temporary credentials. In the current implementation, the client manager implementation should be just a simple wrapper of the raw AWS client.This PR also adds test cases for emulated SQS services using LocalStack. By default, the tests can be executed with the task (same as the workflow). I also updated the component's README to include instructions for running these tests manually.
Checklist
breaking change.
Validation performed
Summary by CodeRabbit
New Features
Tests
Documentation
Chores