feat(log-ingestor): Add partial implementation of IngestionJobManager for creating and deleting ingestion jobs.#1704
Conversation
WalkthroughAdds an async-safe IngestionJobManager to create/track S3 scanner and SQS listener jobs, consolidates S3 settings into a shared S3IngestionBaseConfig, removes secrecy-wrapping for AWS secrets, and adds factory constructors for S3/SQS clients and a unified IngestionJob abstraction. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Client as API Client
participant Manager as IngestionJobManager
participant AWS as AWS SDK (S3/SQS)
participant Job as IngestionJob (S3Scanner / SqsListener)
Client->>Manager: create_s3_scanner_job(config)
Manager->>AWS: S3ClientWrapper::create(region, access_key, secret)
AWS-->>Manager: S3 client
Manager->>Job: spawn S3Scanner with client & config
Job-->>Manager: started (job_id, key_prefix)
Manager-->>Client: job_id
alt shutdown
Client->>Manager: shutdown_and_remove_job(job_id)
Manager->>Job: ingestion_job.shutdown_and_join()
Job-->>Manager: stopped
Manager-->>Client: ok
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20–30 minutes
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: ASSERTIVE Plan: Pro 📒 Files selected for processing (1)
🧰 Additional context used🧠 Learnings (2)📓 Common learnings📚 Learning: 2025-01-15T16:36:48.932ZApplied to files:
🧬 Code graph analysis (1)components/log-ingestor/src/aws_client_manager.rs (2)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
🔇 Additional comments (2)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
LogIngestorManager for creating and deleting ingestion jobs.IngestionJobManager for creating and deleting ingestion jobs.
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
components/log-ingestor/Cargo.toml (1)
4-4: Critical: Edition "2024" is invalid.Rust editions only support 2015, 2018, and 2021. Set the edition to "2021" (the current stable edition) or another valid edition.
-edition = "2024" +edition = "2021"
♻️ Duplicate comments (1)
components/log-ingestor/src/ingestion_job_manager.rs (1)
184-194: Hardcoded AWS SQS endpoint construction may not support all regions.Similar to the S3 endpoint issue, the SQS endpoint construction using string formatting may not work for GovCloud, China regions, FIPS endpoints, or custom services.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (9)
components/clp-rust-utils/src/ingestion_job.rs(1 hunks)components/clp-rust-utils/src/lib.rs(1 hunks)components/log-ingestor/Cargo.toml(1 hunks)components/log-ingestor/src/ingestion_job.rs(1 hunks)components/log-ingestor/src/ingestion_job/s3_scanner.rs(4 hunks)components/log-ingestor/src/ingestion_job/sqs_listener.rs(4 hunks)components/log-ingestor/src/ingestion_job_manager.rs(1 hunks)components/log-ingestor/src/lib.rs(1 hunks)components/log-ingestor/tests/test_ingestion_job.rs(3 hunks)
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-08T15:52:50.753Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-01T07:59:11.208Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
📚 Learning: 2025-11-03T16:17:40.223Z
Learnt from: hoophalab
Repo: y-scope/clp PR: 1535
File: components/clp-rust-utils/src/clp_config/package/config.rs:47-61
Timestamp: 2025-11-03T16:17:40.223Z
Learning: In the y-scope/clp repository, the `ApiServer` struct in `components/clp-rust-utils/src/clp_config/package/config.rs` is a Rust-native configuration type and does not mirror any Python code, unlike other structs in the same file (Config, Database, ResultsCache, Package) which are mirrors of Python definitions.
Applied to files:
components/clp-rust-utils/src/ingestion_job.rs
📚 Learning: 2025-10-22T21:02:31.113Z
Learnt from: junhaoliao
Repo: y-scope/clp PR: 0
File: :0-0
Timestamp: 2025-10-22T21:02:31.113Z
Learning: Repository y-scope/clp: Maintain deterministic CI/builds for Rust; add a check to verify Cargo.lock is in sync with Cargo.toml without updating dependencies (non-mutating verification in clp-rust-checks workflow).
Applied to files:
components/clp-rust-utils/src/ingestion_job.rs
🧬 Code graph analysis (2)
components/log-ingestor/src/ingestion_job.rs (2)
components/log-ingestor/src/ingestion_job/s3_scanner.rs (2)
shutdown_and_join(184-187)get_id(193-195)components/log-ingestor/src/ingestion_job/sqs_listener.rs (2)
shutdown_and_join(219-222)get_id(228-230)
components/log-ingestor/src/ingestion_job_manager.rs (3)
components/log-ingestor/src/ingestion_job.rs (2)
from(47-49)from(53-55)components/log-ingestor/src/ingestion_job/s3_scanner.rs (1)
spawn(152-171)components/log-ingestor/src/ingestion_job/sqs_listener.rs (1)
spawn(187-206)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: package-image
- GitHub Check: rust-checks
- GitHub Check: lint-check (macos-15)
- GitHub Check: lint-check (ubuntu-24.04)
🔇 Additional comments (16)
components/log-ingestor/Cargo.toml (1)
17-17: thiserror 2.0.17 is stable and compatible with Rust 2021.Version 2.0.17 is a stable release with Rust 2021 edition support and MSRV of 1.61, confirming compatibility with the project's Rust edition. Dependency conflict verification requires codebase inspection of Cargo.lock and existing dependency tree.
components/log-ingestor/src/lib.rs (1)
4-4: LGTM!The module declaration is correct and properly exposes the new ingestion job manager.
components/clp-rust-utils/src/lib.rs (1)
4-4: LGTM!The module declaration correctly exposes the new ingestion job configuration types for cross-crate usage.
components/log-ingestor/src/ingestion_job/sqs_listener.rs (2)
14-22: LGTM!The refactoring to use the centralized
S3IngestionBaseConfigis clean and consistent. All field accesses have been properly updated to reference the nestedbaseconfiguration.
144-163: LGTM!The metadata extraction and object relevance checks correctly reference the nested base configuration fields (
config.base.bucket_nameandconfig.base.key_prefix).components/log-ingestor/src/ingestion_job.rs (1)
8-56: LGTM!The
IngestionJobenum provides a clean abstraction over the different ingestion job types. The delegation methods andFromimplementations are correctly implemented.components/log-ingestor/tests/test_ingestion_job.rs (2)
166-184: LGTM!The test correctly constructs the
SqsListenerConfigwith the new nestedS3IngestionBaseConfigstructure.
251-262: LGTM!The test correctly constructs the
S3ScannerConfigwith the new nestedS3IngestionBaseConfigstructure.components/log-ingestor/src/ingestion_job/s3_scanner.rs (2)
12-18: LGTM!The refactoring to use the centralized
S3IngestionBaseConfigis clean and consistent with the SQS listener changes.
92-123: LGTM!The S3 listing and metadata extraction correctly reference the nested base configuration fields.
components/log-ingestor/src/ingestion_job_manager.rs (5)
226-228: LGTM!The
is_mutually_prefix_freelogic is correct. It properly detects when two prefixes conflict (one is a prefix of the other).
121-164: LGTM!The generic
createmethod properly enforces mutually prefix-free constraints across all existing jobs before creating a new entry. The locking strategy is correct, and the mutex guard is explicitly dropped after insertion.
53-63: LGTM!The S3 scanner job creation method correctly extracts the key prefix and constructs the appropriate client manager and ingestion job.
76-91: LGTM!The SQS listener job creation method follows the same pattern as the S3 scanner and correctly delegates to the generic
createhelper.
106-119: LGTM!The shutdown logic correctly removes the job from the table before shutting down both the ingestion job and listener, ensuring proper cleanup.
components/clp-rust-utils/src/ingestion_job.rs (1)
1-21: Verify whether Serde derives are needed for API integration.Based on the author's comment that this configuration is needed "to define API server parameters to create ingestion jobs," this struct may require
serde::Serializeandserde::Deserializederives for JSON serialization/deserialization in API endpoints. Check how this struct is used in API request handlers and compare with similar configuration structs in the codebase to confirm whether these derives are necessary.
There was a problem hiding this comment.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (1)
components/log-ingestor/src/ingestion_job_manager.rs(1 hunks)
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-08T15:52:50.753Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-01T07:59:11.208Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
📚 Learning: 2025-04-25T20:46:20.140Z
Learnt from: haiqi96
Repo: y-scope/clp PR: 852
File: components/clp-package-utils/clp_package_utils/scripts/native/compress.py:151-160
Timestamp: 2025-04-25T20:46:20.140Z
Learning: For S3 URLs without region specifications (legacy global endpoints), either assign a default region (us-east-1) or throw a clear error message requiring region specification in the URL. This addresses validation issues in components like S3InputConfig that require a non-nullable region string.
Applied to files:
components/log-ingestor/src/ingestion_job_manager.rs
📚 Learning: 2025-01-15T16:36:48.932Z
Learnt from: haiqi96
Repo: y-scope/clp PR: 651
File: components/job-orchestration/job_orchestration/scheduler/job_config.py:29-39
Timestamp: 2025-01-15T16:36:48.932Z
Learning: The S3 service in the clp codebase only supports AWS S3, where region_code is mandatory. Other S3-like services are not supported.
Applied to files:
components/log-ingestor/src/ingestion_job_manager.rs
🧬 Code graph analysis (1)
components/log-ingestor/src/ingestion_job_manager.rs (3)
components/log-ingestor/src/ingestion_job.rs (2)
from(47-49)from(53-55)components/log-ingestor/src/ingestion_job/s3_scanner.rs (1)
spawn(152-171)components/log-ingestor/src/ingestion_job/sqs_listener.rs (1)
spawn(187-206)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
- GitHub Check: package-image
- GitHub Check: lint-check (macos-15)
- GitHub Check: lint-check (ubuntu-24.04)
- GitHub Check: rust-checks
- GitHub Check: build (macos-15)
🔇 Additional comments (3)
components/log-ingestor/src/ingestion_job_manager.rs (3)
108-121: Good lock release pattern.The implementation correctly removes the job entry from the table and explicitly drops the lock before performing the potentially blocking shutdown operations. This prevents holding the lock during long-running operations and avoids potential deadlocks.
155-167: Prefix conflict detection logic is correct.The implementation correctly identifies conflicts when both the region matches and the key prefixes are not mutually prefix-free. The logic appropriately skips non-conflicting entries and returns a clear error message for conflicts.
257-258: Utility function is correct and handles edge cases.The implementation correctly checks that neither string is a prefix of the other. The logic properly handles edge cases, including empty strings (an empty prefix would conflict with any non-empty prefix, which is the correct behavior).
There was a problem hiding this comment.
Actionable comments posted: 3
♻️ Duplicate comments (4)
components/log-ingestor/src/ingestion_job_manager.rs (4)
31-38: Public manager type is not constructible from outside this module.
IngestionJobManagerispubbut all fields are private and there is no public constructor or factory, so other modules cannot instantiate it. Consider adding apub fn new(...) -> Self(or a module-levelpub fn create_ingestion_job_manager(...)) that initialisesjob_tabletoArc::new(Mutex::new(HashMap::new()))and wires the configuration fields andAwsCredentials.
195-223: Hardcoded S3/SQS endpoints will not work for all AWS partitions and special endpoints.
create_s3_client_managerandcreate_sqs_client_managerbuild endpoints ashttps://s3.{region}.amazonaws.comandhttps://sqs.{region}.amazonaws.com. This only covers the standardawspartition; China (aws-cn) and GovCloud (aws-us-gov) require.amazonaws.com.cnand different host patterns, and FIPS endpoints use*-fipshosts. Given the repo’s convention that S3 only supports AWS S3 with a mandatoryregion_code, it would be safer to rely on the AWS SDK’s region-based endpoint resolution (or a small region→endpoint resolver) instead of string interpolation.
225-231: Listener creation wires buffer correctly, but submittertodo!()makes this path unsafe.
create_listenerconstructs aBufferwithCompressionJobSubmitter {}and spawns aListener, so once ingestion jobs start producingObjectMetadata, the submitter’ssubmitimplementation will eventually be called. As long asCompressionJobSubmitter::submitremains atodo!(), any buffer flush will panic. Until the real implementation exists, consider returning a structured error fromsubmitor gating creation of real jobs behind a feature/flag so this path cannot be hit in production.
242-250: Explicittodo!()in async trait impl is a sharp edge.Because
CompressionJobSubmitterimplementsBufferSubmitterusingtodo!(), any caller that reachessubmitwill panic the task. If you must land this as a partial implementation, you may want to replacetodo!()with something likeanyhow::bail!("CompressionJobSubmitter::submit is not implemented yet")so callers receive a recoverable error instead of an unconditional panic.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (2)
components/clp-rust-utils/src/clp_config/s3_config.rs(2 hunks)components/log-ingestor/src/ingestion_job_manager.rs(1 hunks)
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-01T07:59:11.208Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-08T15:52:50.753Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
📚 Learning: 2025-04-25T20:46:20.140Z
Learnt from: haiqi96
Repo: y-scope/clp PR: 852
File: components/clp-package-utils/clp_package_utils/scripts/native/compress.py:151-160
Timestamp: 2025-04-25T20:46:20.140Z
Learning: For S3 URLs without region specifications (legacy global endpoints), either assign a default region (us-east-1) or throw a clear error message requiring region specification in the URL. This addresses validation issues in components like S3InputConfig that require a non-nullable region string.
Applied to files:
components/log-ingestor/src/ingestion_job_manager.rs
📚 Learning: 2025-01-15T16:36:48.932Z
Learnt from: haiqi96
Repo: y-scope/clp PR: 651
File: components/job-orchestration/job_orchestration/scheduler/job_config.py:29-39
Timestamp: 2025-01-15T16:36:48.932Z
Learning: The S3 service in the clp codebase only supports AWS S3, where region_code is mandatory. Other S3-like services are not supported.
Applied to files:
components/log-ingestor/src/ingestion_job_manager.rs
🧬 Code graph analysis (1)
components/log-ingestor/src/ingestion_job_manager.rs (4)
components/log-ingestor/tests/test_ingestion_job.rs (2)
mpsc(186-186)mpsc(264-264)components/log-ingestor/src/ingestion_job.rs (2)
from(47-49)from(53-55)components/log-ingestor/src/ingestion_job/s3_scanner.rs (1)
spawn(152-171)components/log-ingestor/src/ingestion_job/sqs_listener.rs (1)
spawn(187-206)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
- GitHub Check: package-image
- GitHub Check: build (macos-15)
- GitHub Check: build (ubuntu-24.04)
- GitHub Check: rust-checks
- GitHub Check: lint-check (ubuntu-24.04)
- GitHub Check: lint-check (macos-15)
🔇 Additional comments (7)
components/clp-rust-utils/src/clp_config/s3_config.rs (5)
1-2: LGTM!Imports are correctly specified for the
SecretStringusage and customSerializeimplementation.
28-34: LGTM!The
PartialEqimplementation correctly handlesSecretStringcomparison usingexpose_secret().
36-37: LGTM!Standard
Eqmarker implementation.
57-78: LGTM!The test correctly validates JSON serialization of both fields. The
SecretString::new()usage is appropriate for secrecy 0.10.x.
22-26: Good use ofSecretStringfor sensitive credential data.This prevents accidental exposure of the secret via
Debuglogging. However, note that a customDeserializeimplementation would be needed ifAwsCredentialsmust be loaded from configuration files (e.g., YAML/JSON). If credentials are only constructed programmatically, this is fine.components/log-ingestor/src/ingestion_job_manager.rs (2)
40-92: Job creation APIs are structured cleanly.
create_s3_scanner_jobandcreate_sqs_listenercorrectly deriveregion/key_prefix, build the AWS client managers before taking the job-table lock, and reusecreate_s3_ingestion_jobfor shared logic. This keeps the hot path simple and avoids holding the mutex across.awaits.
252-258: Prefix-free helper matches the documented semantics.
is_mutually_prefix_freecorrectly returnsfalsewhen either string is a prefix of the other andtrueotherwise, including edge cases like the empty string; the implementation is clear and symmetric.
There was a problem hiding this comment.
Actionable comments posted: 2
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (7)
components/clp-rust-utils/src/job_config.rs(1 hunks)components/clp-rust-utils/src/job_config/compression.rs(1 hunks)components/clp-rust-utils/src/job_config/ingestion.rs(1 hunks)components/log-ingestor/src/compression/compression_job_submitter.rs(1 hunks)components/log-ingestor/src/ingestion_job/s3_scanner.rs(4 hunks)components/log-ingestor/src/ingestion_job/sqs_listener.rs(4 hunks)components/log-ingestor/tests/test_ingestion_job.rs(3 hunks)
🧰 Additional context used
🧠 Learnings (6)
📓 Common learnings
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-08T15:52:50.753Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-01T07:59:11.208Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
📚 Learning: 2025-11-03T16:17:40.223Z
Learnt from: hoophalab
Repo: y-scope/clp PR: 1535
File: components/clp-rust-utils/src/clp_config/package/config.rs:47-61
Timestamp: 2025-11-03T16:17:40.223Z
Learning: In the y-scope/clp repository, the `ApiServer` struct in `components/clp-rust-utils/src/clp_config/package/config.rs` is a Rust-native configuration type and does not mirror any Python code, unlike other structs in the same file (Config, Database, ResultsCache, Package) which are mirrors of Python definitions.
Applied to files:
components/clp-rust-utils/src/job_config/ingestion.rs
📚 Learning: 2025-10-22T21:02:31.113Z
Learnt from: junhaoliao
Repo: y-scope/clp PR: 0
File: :0-0
Timestamp: 2025-10-22T21:02:31.113Z
Learning: Repository y-scope/clp: Maintain deterministic CI/builds for Rust; add a check to verify Cargo.lock is in sync with Cargo.toml without updating dependencies (non-mutating verification in clp-rust-checks workflow).
Applied to files:
components/clp-rust-utils/src/job_config/ingestion.rs
📚 Learning: 2025-09-28T15:00:22.170Z
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 1340
File: components/job-orchestration/job_orchestration/executor/compress/compression_task.py:528-528
Timestamp: 2025-09-28T15:00:22.170Z
Learning: In components/job-orchestration/job_orchestration/executor/compress/compression_task.py, there is a suggestion to refactor from passing logger as a parameter through multiple functions to creating a ClpCompressor class that takes the logger as a class member, with current helper functions becoming private member functions.
Applied to files:
components/log-ingestor/src/compression/compression_job_submitter.rs
📚 Learning: 2025-07-23T09:54:45.185Z
Learnt from: Bill-hbrhbr
Repo: y-scope/clp PR: 1122
File: components/core/src/clp/clp/CMakeLists.txt:175-195
Timestamp: 2025-07-23T09:54:45.185Z
Learning: In the CLP project, when reviewing CMakeLists.txt changes that introduce new compression library dependencies (BZip2, LibLZMA, LZ4, ZLIB), the team prefers to address conditional linking improvements in separate PRs rather than expanding the scope of focused migration PRs like the LibArchive task-based installation migration.
Applied to files:
components/log-ingestor/src/compression/compression_job_submitter.rs
📚 Learning: 2025-04-26T02:21:22.021Z
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 856
File: components/core/src/clp/ffi/ir_stream/search/utils.cpp:258-266
Timestamp: 2025-04-26T02:21:22.021Z
Learning: In the clp::ffi::ir_stream::search namespace, the design principle is that callers are responsible for type checking, not the called functions. If control flow reaches a function, input types should already be validated by the caller.
Applied to files:
components/clp-rust-utils/src/job_config.rs
🧬 Code graph analysis (1)
components/log-ingestor/src/compression/compression_job_submitter.rs (3)
components/clp-py-utils/clp_py_utils/clp_config.py (2)
AwsAuthentication(470-500)S3Config(503-507)components/clp-rust-utils/src/database/mysql.rs (1)
create_mysql_pool(20-36)components/job-orchestration/job_orchestration/scheduler/job_config.py (3)
ClpIoConfig(56-58)OutputConfig(47-53)S3InputConfig(32-44)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
- GitHub Check: package-image
- GitHub Check: lint-check (macos-15)
- GitHub Check: lint-check (ubuntu-24.04)
- GitHub Check: build (ubuntu-24.04)
- GitHub Check: rust-checks
- GitHub Check: build (macos-15)
🔇 Additional comments (8)
components/clp-rust-utils/src/job_config/compression.rs (1)
1-13: LGTM!The enum is well-structured with appropriate derives for serialization (
Serialize/Deserialize) and primitive conversion (IntoPrimitive/TryFromPrimitive). The documentation comment about mirroring the Python constant is helpful for maintainability.components/clp-rust-utils/src/job_config.rs (1)
4-7: LGTM!The module declaration and glob re-export follow the established pattern used by other modules in this file.
components/log-ingestor/src/ingestion_job/sqs_listener.rs (1)
14-22: LGTM!The refactoring to use
S3IngestionBaseConfigconsolidates common S3 configuration fields into a shared base type, improving consistency acrossSqsListenerConfigandS3ScannerConfig.components/clp-rust-utils/src/job_config/ingestion.rs (1)
1-24: LGTM!The struct is well-documented and serves as a clean abstraction for shared S3 ingestion parameters. The fields align with the Python
S3InputConfigandS3Configschemas from the relevant code snippets.components/log-ingestor/src/ingestion_job/s3_scanner.rs (1)
12-18: LGTM!The refactoring to use
S3IngestionBaseConfigis consistent with the changes inSqsListenerConfig, enabling shared configuration across ingestion job types.components/log-ingestor/tests/test_ingestion_job.rs (2)
173-181: LGTM!Test configuration correctly updated to use the new
S3IngestionBaseConfigstructure with all required and optional fields explicitly specified.
253-261: LGTM!Consistent with the SQS listener test configuration update.
components/log-ingestor/src/compression/compression_job_submitter.rs (1)
4-10: Unable to verify the review comment. Repository access failed during verification attempt, making it impossible to confirm the import path issues, unused imports, or the accuracy of the proposed fix.
| match status { | ||
| CompressionJobStatus::Failed => { | ||
| tracing::error!( | ||
| "Compression job {} failed. Status message: {:?}", | ||
| job_id, | ||
| status_message | ||
| ); | ||
| return; | ||
| } | ||
| CompressionJobStatus::Succeeded => { | ||
| break; | ||
| } | ||
| _ => continue, | ||
| } |
There was a problem hiding this comment.
Killed status is treated as a transient state, causing indefinite polling.
When status is CompressionJobStatus::Killed, the _ => continue branch will keep polling indefinitely. A killed job won't transition to Succeeded or Failed, so this loop will never exit.
Apply this diff:
match status {
CompressionJobStatus::Failed => {
tracing::error!(
"Compression job {} failed. Status message: {:?}",
job_id,
status_message
);
return;
}
CompressionJobStatus::Succeeded => {
break;
}
+ CompressionJobStatus::Killed => {
+ tracing::warn!(
+ "Compression job {} was killed. Status message: {:?}",
+ job_id,
+ status_message
+ );
+ return;
+ }
_ => continue,
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| match status { | |
| CompressionJobStatus::Failed => { | |
| tracing::error!( | |
| "Compression job {} failed. Status message: {:?}", | |
| job_id, | |
| status_message | |
| ); | |
| return; | |
| } | |
| CompressionJobStatus::Succeeded => { | |
| break; | |
| } | |
| _ => continue, | |
| } | |
| match status { | |
| CompressionJobStatus::Failed => { | |
| tracing::error!( | |
| "Compression job {} failed. Status message: {:?}", | |
| job_id, | |
| status_message | |
| ); | |
| return; | |
| } | |
| CompressionJobStatus::Succeeded => { | |
| break; | |
| } | |
| CompressionJobStatus::Killed => { | |
| tracing::warn!( | |
| "Compression job {} was killed. Status message: {:?}", | |
| job_id, | |
| status_message | |
| ); | |
| return; | |
| } | |
| _ => continue, | |
| } |
🤖 Prompt for AI Agents
In components/log-ingestor/src/compression/compression_job_submitter.rs around
lines 141 to 154, the match treats CompressionJobStatus::Killed as a transient
state via the catch-all arm, causing infinite polling; add an explicit
CompressionJobStatus::Killed arm that logs the killed status (including job_id
and status_message) and then returns (or otherwise exits the loop) instead of
continuing, so killed jobs do not cause indefinite retries.
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (6)
components/log-ingestor/src/ingestion_job_manager.rs (6)
32-39: Expose a constructor forIngestionJobManagerso callers can actually use it.
IngestionJobManagerispubbut all fields are private and there is nopub fn new(or other factory), so external code currently cannot construct an instance. Consider adding a constructor that initialisesjob_tableand wires in the buffer and AWS settings, e.g.:impl IngestionJobManager { + /// Creates a new `IngestionJobManager`. + #[must_use] + pub fn new( + buffer_timeout: Duration, + buffer_size_threshold: u64, + channel_capacity: usize, + aws_credentials: AwsCredentials, + ) -> Self { + Self { + job_table: Arc::new(Mutex::new(HashMap::new())), + buffer_timeout, + buffer_size_threshold, + channel_capacity, + aws_credentials, + } + } + /// Creates a new S3 scanner ingestion job.
93-119: Ensure listener shutdown is attempted even if ingestion shutdown fails.If
ingestion_job.shutdown_and_join().awaitreturns an error,listener.shutdown_and_join().awaitis never called, so the compression listener may keep running even though the job is already removed fromjob_table. It would be safer to perform both shutdowns in a best-effort manner (e.g., call both, then combine/choose which error to return) so one failure doesn’t leak the other task.
174-181: UUID generation loop is correct but could be simplified.The
loopwithcontains_keyis functionally fine and safe under the mutex, but it’s slightly verbose. You could either accept the astronomically low collision risk and generate once, or useHashMap::entryfor a single lookup. Since a similar nit was previously raised, feel free to leave as-is if you prefer this style.
207-220: Avoid hardcoding S3 endpoints; rely on SDK region-based resolution instead.Constructing
https://s3.{region}.amazonaws.comonly works for the standard AWS partition and misses GovCloud/China/FIPS variations. Given the retrieved learnings that S3 is AWS-only and region is mandatory, it’s better to let the AWS SDK derive the endpoint from the region (or use a small region→endpoint resolver) rather than interpolating the hostname yourself.
222-235: Similarly, avoid hardcoding SQS endpoints; delegate to the SDK.
https://sqs.{region}.amazonaws.comhas the same partition limitations as the S3 endpoint. Prefer configuring the client with the region and letting the AWS Rust SDK choose the correct host (including GovCloud/China/FIPS cases) instead of manual string construction.
256-264:CompressionJobSubmitter::submitusestodo!()in a production path.Because
create_listeneralways instantiatesCompressionJobSubmitter, any buffer submission will currently panic. The PR description notes this is intentional pending a future implementation, but you may want to replacetodo!with a more descriptive error (or gate job creation) to avoid surprising runtime panics once the manager is wired up.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (1)
components/log-ingestor/src/ingestion_job_manager.rs(1 hunks)
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-08T15:52:50.753Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-01T07:59:11.208Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
📚 Learning: 2025-04-25T20:46:20.140Z
Learnt from: haiqi96
Repo: y-scope/clp PR: 852
File: components/clp-package-utils/clp_package_utils/scripts/native/compress.py:151-160
Timestamp: 2025-04-25T20:46:20.140Z
Learning: For S3 URLs without region specifications (legacy global endpoints), either assign a default region (us-east-1) or throw a clear error message requiring region specification in the URL. This addresses validation issues in components like S3InputConfig that require a non-nullable region string.
Applied to files:
components/log-ingestor/src/ingestion_job_manager.rs
📚 Learning: 2025-01-15T16:36:48.932Z
Learnt from: haiqi96
Repo: y-scope/clp PR: 651
File: components/job-orchestration/job_orchestration/scheduler/job_config.py:29-39
Timestamp: 2025-01-15T16:36:48.932Z
Learning: The S3 service in the clp codebase only supports AWS S3, where region_code is mandatory. Other S3-like services are not supported.
Applied to files:
components/log-ingestor/src/ingestion_job_manager.rs
🧬 Code graph analysis (1)
components/log-ingestor/src/ingestion_job_manager.rs (4)
components/log-ingestor/tests/test_ingestion_job.rs (2)
mpsc(187-187)mpsc(266-266)components/log-ingestor/src/ingestion_job.rs (2)
from(47-49)from(53-55)components/log-ingestor/src/ingestion_job/sqs_listener.rs (1)
spawn(187-206)components/log-ingestor/src/ingestion_job/s3_scanner.rs (1)
spawn(152-171)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
- GitHub Check: package-image
- GitHub Check: rust-checks
- GitHub Check: build (ubuntu-24.04)
- GitHub Check: build (macos-15)
- GitHub Check: lint-check (ubuntu-24.04)
- GitHub Check: lint-check (macos-15)
🔇 Additional comments (6)
components/log-ingestor/src/ingestion_job_manager.rs (6)
41-63: S3 scanner job creation flow looks sound.Cloning the base config, creating a region-scoped S3 client manager, and delegating to
create_s3_ingestion_jobwith a closure that spawnsS3Scanneris straightforward and matches the intended job lifecycle; no issues from this function itself.
65-91: SQS listener job creation mirrors the S3 scanner path appropriately.Using the shared
S3IngestionBaseConfigfor prefix conflict detection and wiringSqsListener::spawnthrough the same helper keeps the API consistent; this function itself looks correct.
121-172: Prefix-conflict detection now correctly includes bucket and dataset.The updated check on
(region, bucket_name, dataset)plusis_mutually_prefix_freematches the documented behaviour and fixes the prior false-conflict issue for identical prefixes across different buckets/datasets. The error message is clear and aligned with the logic.
183-205: Job-table insertion and listener wiring are coherent.Cloning region/bucket/prefix/dataset into the table entry, creating a per-job
Listener, and passing a fresh sender into the job-creation callback gives each job an isolated ingestion path while keeping shared config in the manager. No obvious correctness issues here.
246-254: Job table entry structure aligns with the conflict-detection logic.Storing
region,bucket_name,key_prefix, anddatasetalongside the job and listener matches how conflicts are checked increate_s3_ingestion_job; this looks coherent.
266-272:is_mutually_prefix_freecorrectly captures the intended relationship.The implementation directly matches the doc comment: it returns
trueonly when neither string is a prefix of the other. This is exactly what the conflict logic expects.
| /// # Returns | ||
| /// | ||
| /// A new listener for receiving object metadata to ingest. | ||
| fn create_listener(&self, _ingestion_job_config: S3IngestionBaseConfig) -> Listener { | ||
| let buffer = Buffer::new(CompressionJobSubmitter {}, self.buffer_size_threshold); | ||
| Listener::spawn(buffer, self.buffer_timeout, self.channel_capacity) | ||
| } |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
create_listener ignores the per-job config; consider dropping or using the parameter.
_ingestion_job_config is currently unused, and all jobs share the same buffer settings from the manager. That may be intentional, but if listener behaviour is truly global, you could drop the parameter entirely for now to avoid confusion; if you plan to use it later, a short comment would clarify that it’s reserved for future per-job tuning.
🤖 Prompt for AI Agents
In components/log-ingestor/src/ingestion_job_manager.rs around lines 237 to 243,
the create_listener function takes _ingestion_job_config but ignores it and
always uses manager-wide buffer settings; either remove the unused parameter
from the signature (and update all call sites) to avoid confusion, or use the
provided S3IngestionBaseConfig to derive per-job
buffer_size_threshold/timeout/capacity when constructing Buffer and Listener; if
you intend to keep the parameter for future use, add a brief comment above the
parameter stating it is reserved for future per-job tuning to make the intent
explicit.
hoophalab
left a comment
There was a problem hiding this comment.
looks good to me in general. There are some comments on code organization.
| @@ -0,0 +1,24 @@ | |||
| /// Base configuration for ingesting logs from S3. | |||
| #[derive(Clone, Debug)] | |||
| pub struct S3IngestionBaseConfig { | |||
There was a problem hiding this comment.
I guess this is going to be serialized into clp-db? Can we reuse/extend S3InputConfig which inherits S3Config? The fields look same except tags. If we use an existing job config, we don't need to change any Python code in compression_scheduler/compression_worker
There was a problem hiding this comment.
- This won't get into clp-db for job submission. It's only interfacing users to submit ingestion jobs to the server.
- Prefer to have a dedicated struct: it makes API document more clear.
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
components/log-ingestor/Cargo.toml (1)
4-4: Edition "2024" is invalid and will prevent compilation.Rust editions are "2015", "2018", or "2021". The value "2024" does not exist and will cause a compilation error.
Apply this diff to correct the edition:
-edition = "2024" +edition = "2021"
♻️ Duplicate comments (4)
components/log-ingestor/src/ingestion_job_manager.rs (3)
227-235:CompressionJobSubmitterstub is acceptable for a partial PR but must be gated.
CompressionJobSubmitter::submitis stilltodo!(), so any path that actually drains a buffer will panic. Since the PR is explicitly a partial implementation, this is fine as long asIngestionJobManager(and thusListenerwith this submitter) is not yet used in production code. Before wiring it into real flows, consider returning a structured error instead of panicking, or implementing the real submission logic.
31-38: Consider adding an explicit constructor forIngestionJobManager.
IngestionJobManagerispubbut all its fields are private and there is nonew/builder, so it cannot be constructed from other modules without a struct literal in this file. If you expect external callers to use it (even in a future PR), adding apub fn new(...) -> Selfthat initialisesjob_tableand takes the buffer settings andAwsCredentialswould make the API clearer and avoid ad‑hoc initialisation.
111-123: Ensure listener is shut down even if ingestion job shutdown fails.In
shutdown_and_remove_job, ifingestion_job.shutdown_and_join().awaitreturns an error,listener.shutdown_and_join().awaitis never called, so the listener task can be left running even though the job has been removed from the table.You can attempt both shutdowns and then decide how to surface errors, for example:
- Some(entry) => { - entry.ingestion_job.shutdown_and_join().await?; - entry.listener.shutdown_and_join().await?; - Ok(()) - } + Some(entry) => { + let ingestion_result = entry.ingestion_job.shutdown_and_join().await; + let listener_result = entry.listener.shutdown_and_join().await; + + if let Err(err) = ingestion_result { + return Err(Error::from(err)); + } + if let Err(err) = listener_result { + return Err(Error::from(err)); + } + Ok(()) + }This way, both components get a best‑effort shutdown, and you still propagate errors to the caller.
components/log-ingestor/src/aws_client_manager.rs (1)
5-8: Hard‑coded S3/SQS endpoints increatelimit partition and LocalStack support.
SqsClientWrapper::createandS3ClientWrapper::createalways constructhttps://sqs.{region}.amazonaws.com/https://s3.{region}.amazonaws.com, ignoring any partition differences (e.g.,amazonaws.com.cnfor China, GovCloud, FIPS variants) and making it hard to point at LocalStack or other custom endpoints. Based on learnings, the S3 service here is intended to support AWS S3 generally, so baking in the commercial-domain pattern is brittle.Consider one of:
- Letting the AWS SDK derive the endpoint purely from the region (do not pass a custom endpoint URL for normal AWS usage).
- Accepting an explicit endpoint (or a small region→endpoint resolver) so you can handle aws-cn / aws-us-gov / FIPS and LocalStack without recomputing hostnames ad hoc.
- At minimum, documenting that these helpers are only valid for standard AWS commercial regions.
Also applies to: 55-65, 86-96
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (6)
components/clp-rust-utils/src/s3/client.rs(1 hunks)components/clp-rust-utils/src/sqs/client.rs(1 hunks)components/log-ingestor/Cargo.toml(1 hunks)components/log-ingestor/src/aws_client_manager.rs(3 hunks)components/log-ingestor/src/ingestion_job_manager.rs(1 hunks)components/log-ingestor/tests/test_ingestion_job.rs(5 hunks)
🧰 Additional context used
🧠 Learnings (6)
📓 Common learnings
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-08T15:52:50.753Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-01T07:59:11.208Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
📚 Learning: 2025-04-25T20:46:20.140Z
Learnt from: haiqi96
Repo: y-scope/clp PR: 852
File: components/clp-package-utils/clp_package_utils/scripts/native/compress.py:151-160
Timestamp: 2025-04-25T20:46:20.140Z
Learning: For S3 URLs without region specifications (legacy global endpoints), either assign a default region (us-east-1) or throw a clear error message requiring region specification in the URL. This addresses validation issues in components like S3InputConfig that require a non-nullable region string.
Applied to files:
components/log-ingestor/src/ingestion_job_manager.rs
📚 Learning: 2025-01-15T16:36:48.932Z
Learnt from: haiqi96
Repo: y-scope/clp PR: 651
File: components/job-orchestration/job_orchestration/scheduler/job_config.py:29-39
Timestamp: 2025-01-15T16:36:48.932Z
Learning: The S3 service in the clp codebase only supports AWS S3, where region_code is mandatory. Other S3-like services are not supported.
Applied to files:
components/log-ingestor/src/ingestion_job_manager.rs
📚 Learning: 2025-09-15T22:20:40.750Z
Learnt from: quinntaylormitchell
Repo: y-scope/clp PR: 1125
File: components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py:267-291
Timestamp: 2025-09-15T22:20:40.750Z
Learning: For CLP compression jobs, the team has decided to fail the entire job immediately upon encountering any invalid input path, rather than continuing to process valid paths. This decision was made during PR #1125 development.
Applied to files:
components/log-ingestor/src/ingestion_job_manager.rs
📚 Learning: 2025-10-22T21:02:31.113Z
Learnt from: junhaoliao
Repo: y-scope/clp PR: 0
File: :0-0
Timestamp: 2025-10-22T21:02:31.113Z
Learning: Repository y-scope/clp: Maintain deterministic CI/builds for Rust; add a check to verify Cargo.lock is in sync with Cargo.toml without updating dependencies (non-mutating verification in clp-rust-checks workflow).
Applied to files:
components/log-ingestor/Cargo.toml
📚 Learning: 2025-10-22T21:01:31.391Z
Learnt from: junhaoliao
Repo: y-scope/clp PR: 0
File: :0-0
Timestamp: 2025-10-22T21:01:31.391Z
Learning: In y-scope/clp, for the clp-rust-checks workflow (GitHub Actions) and the Taskfile target deps:lock:check-rust, we should verify Rust Cargo.lock is in sync with Cargo.toml using a non-mutating method (e.g., cargo metadata --locked / cargo check --locked) to keep CI deterministic and avoid updating dependencies during validation.
Applied to files:
components/log-ingestor/Cargo.toml
🧬 Code graph analysis (2)
components/log-ingestor/src/ingestion_job_manager.rs (4)
components/log-ingestor/src/aws_client_manager.rs (4)
from(51-53)from(82-84)create(55-65)create(86-96)components/log-ingestor/src/ingestion_job.rs (2)
from(47-49)from(53-55)components/log-ingestor/src/ingestion_job/s3_scanner.rs (1)
spawn(152-171)components/log-ingestor/src/ingestion_job/sqs_listener.rs (1)
spawn(187-206)
components/log-ingestor/src/aws_client_manager.rs (2)
components/clp-rust-utils/src/sqs/client.rs (1)
create_new_client(14-35)components/log-ingestor/src/ingestion_job.rs (2)
from(47-49)from(53-55)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
- GitHub Check: package-image
- GitHub Check: build (ubuntu-24.04)
- GitHub Check: build (macos-15)
- GitHub Check: lint-check (ubuntu-24.04)
- GitHub Check: lint-check (macos-15)
- GitHub Check: rust-checks
🔇 Additional comments (6)
components/log-ingestor/Cargo.toml (3)
22-22: Verify thatthiserroris actually used in the codebase.The
thiserrordependency was added but its usage should be confirmed to avoid unnecessary dependencies. Search foruse thiserrorimports and#[derive(Error)]macro usage in the log-ingestor source files to ensure this dependency is actively utilized.
1-25: Verify Cargo.lock is in sync with Cargo.toml changes.Per CI determinism requirements, confirm that the Cargo.lock file reflects all dependency changes in this diff.
14-14: Verify that removing theserdefeature from secrecy does not break code.The
serdefeature was removed from secrecy. If any code serializes or deserializes secrecy types (e.g.,Secret<String>), this change will break compilation.components/clp-rust-utils/src/s3/client.rs (1)
18-27: Signature change to&strand credential wiring look correct.Switching
secret_access_keyto&strand passing it directly intoCredentials::newis consistent with the SQS helper and updated call sites; the client construction logic remains unchanged and looks fine.components/clp-rust-utils/src/sqs/client.rs (1)
14-23: SQS client helper update is consistent with the S3 helper.Using
secret_access_key: &strand passing it straight intoCredentials::newkeeps behaviour the same while aligning with the new call-site types; no issues from a correctness standpoint.components/log-ingestor/src/ingestion_job_manager.rs (1)
211-214: Listener creation matches the buffering model;_ingestion_job_configis clearly marked unused.
create_listenercorrectly wires aBuffer<CompressionJobSubmitter>into aListenerusing the manager’s buffer settings, and the leading underscore on_ingestion_job_configmakes it clear the per‑job config is reserved for future tuning. If you plan to make buffering parameters per‑job later, you can start deriving them from this config without changing the public surface.
| } | ||
|
|
||
| pub async fn create(region: &str, access_key_id: &str, secret_access_key: &str) -> Self { | ||
| let sqs_endpoint = format!("https://sqs.{region}.amazonaws.com"); |
There was a problem hiding this comment.
We might need to revisit this later if we want to test ingestion manager on localstack. Not sure if we can accept an endpoint in parameter and in S3IngestionBaseConfig and pass the endpoint here.
But let's merge this as-is for now.
There was a problem hiding this comment.
Yeah we can figure out how to pass endpoint later. Should be fine.
…r` for creating and deleting ingestion jobs. (y-scope#1704)
Description
As the title suggests. This PR adds a partial implementation of
IngestionJobManagerfor creating and deleting S3 ingestion jobs (usingSqsListenerorS3Scanner). This PR includes:A future PR will be required to implement
CompressionJobSubmitterto actually submit jobs to CLP.Checklist
breaking change.
Validation performed
Summary by CodeRabbit
New Features
Chores
✏️ Tip: You can customize this high-level summary in your review settings.