Skip to content

feat(log-ingestor): Add partial implementation of IngestionJobManager for creating and deleting ingestion jobs.#1704

Merged
LinZhihao-723 merged 17 commits into
y-scope:mainfrom
LinZhihao-723:ingestion-job-manager
Dec 5, 2025
Merged

feat(log-ingestor): Add partial implementation of IngestionJobManager for creating and deleting ingestion jobs.#1704
LinZhihao-723 merged 17 commits into
y-scope:mainfrom
LinZhihao-723:ingestion-job-manager

Conversation

@LinZhihao-723

@LinZhihao-723 LinZhihao-723 commented Nov 30, 2025

Copy link
Copy Markdown
Member

Description

As the title suggests. This PR adds a partial implementation of IngestionJobManager for creating and deleting S3 ingestion jobs (using SqsListener or S3Scanner). This PR includes:

  • The method to create new jobs, while the manager will ensure the newly created job is mutually prefix free.
  • The method to delete a running job.

A future PR will be required to implement CompressionJobSubmitter to actually submit jobs to CLP.

Checklist

  • The PR satisfies the contribution guidelines.
  • This is a breaking change and that has been indicated in the PR title, OR this isn't a
    breaking change.
  • Necessary docs have been updated, OR no docs need to be updated.

Validation performed

  • Ensure all workflows pass.

Summary by CodeRabbit

  • New Features

    • Added an ingestion job manager to create and manage S3 scanner and SQS listener jobs with automatic prefix-conflict detection and graceful shutdown.
    • Standardized S3 ingestion configuration across components for simpler job setup.
  • Chores

    • Updated dependencies: added thiserror, adjusted secrecy entry, removed a dev dependency.
    • Simplified AWS credential handling by removing secret-wrapper usage in client creation.

✏️ Tip: You can customize this high-level summary in your review settings.

@LinZhihao-723 LinZhihao-723 requested a review from a team as a code owner November 30, 2025 23:47
@coderabbitai

coderabbitai Bot commented Nov 30, 2025

Copy link
Copy Markdown
Contributor

Walkthrough

Adds an async-safe IngestionJobManager to create/track S3 scanner and SQS listener jobs, consolidates S3 settings into a shared S3IngestionBaseConfig, removes secrecy-wrapping for AWS secrets, and adds factory constructors for S3/SQS clients and a unified IngestionJob abstraction.

Changes

Cohort / File(s) Summary
Job manager & orchestration
components/log-ingestor/src/ingestion_job_manager.rs, components/log-ingestor/src/lib.rs
New IngestionJobManager with thread-safe job table, job creation APIs (create_s3_scanner_job, create_sqs_listener), shutdown/remove API, prefix-conflict checks, and exported module ingestion_job_manager.
Unified job abstraction
components/log-ingestor/src/ingestion_job.rs
Adds public IngestionJob enum wrapping S3Scanner and SqsListener, with shutdown_and_join() and get_id() methods and From conversions.
S3 scanner updates
components/log-ingestor/src/ingestion_job/s3_scanner.rs
S3ScannerConfig now contains base: S3IngestionBaseConfig; replaced uses of bucket_name/prefix with config.base.bucket_name/config.base.key_prefix.
SQS listener updates
components/log-ingestor/src/ingestion_job/sqs_listener.rs
SqsListenerConfig now contains base: S3IngestionBaseConfig; checks and metadata extraction use config.base.* fields.
Shared ingestion config
components/clp-rust-utils/src/job_config/ingestion.rs, components/clp-rust-utils/src/job_config.rs
Adds pub struct S3IngestionBaseConfig (region, bucket_name, key_prefix, dataset, timestamp_key, unstructured, tags) and re-exports it via pub use ingestion::*;.
AWS client signature & factories
components/clp-rust-utils/src/s3/client.rs, components/clp-rust-utils/src/sqs/client.rs, components/log-ingestor/src/aws_client_manager.rs
create_new_client signatures changed to accept secret_access_key: &str (removed SecretString); removed secrecy imports. Added pub async fn create(...) factory constructors on S3ClientWrapper and SqsClientWrapper.
Tests & examples updated
components/log-ingestor/tests/test_ingestion_job.rs
Tests updated to construct configs using S3IngestionBaseConfig and to pass credentials as plain &str (removed SecretString).
Manifest changes
components/log-ingestor/Cargo.toml
Dependency updates: thiserror = "2.0.17" added; secrecy entry simplified to version string; stdext dev-dependency removed.

Sequence Diagram(s)

sequenceDiagram
    autonumber
    participant Client as API Client
    participant Manager as IngestionJobManager
    participant AWS as AWS SDK (S3/SQS)
    participant Job as IngestionJob (S3Scanner / SqsListener)
    Client->>Manager: create_s3_scanner_job(config)
    Manager->>AWS: S3ClientWrapper::create(region, access_key, secret)
    AWS-->>Manager: S3 client
    Manager->>Job: spawn S3Scanner with client & config
    Job-->>Manager: started (job_id, key_prefix)
    Manager-->>Client: job_id

    alt shutdown
      Client->>Manager: shutdown_and_remove_job(job_id)
      Manager->>Job: ingestion_job.shutdown_and_join()
      Job-->>Manager: stopped
      Manager-->>Client: ok
    end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20–30 minutes

  • Pay extra attention to:
    • Thread-safety and locking semantics in IngestionJobManager (Arc/Mutex usage and potential deadlocks).
    • Correctness of is_mutually_prefix_free prefix-overlap logic.
    • All callsites updated for S3IngestionBaseConfig (no remaining references to removed fields).
    • Security implications of replacing SecretString with &str (where secrets are stored/passed).
    • Async shutdown/error propagation in job creation and shutdown_and_join.

Pre-merge checks and finishing touches

✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The pull request title accurately reflects the main change: adding a partial implementation of IngestionJobManager with methods to create and delete ingestion jobs.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

📜 Recent review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 20241f0 and 549c3cd.

📒 Files selected for processing (1)
  • components/log-ingestor/src/aws_client_manager.rs (2 hunks)
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: quinntaylormitchell
Repo: y-scope/clp PR: 1125
File: components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py:267-291
Timestamp: 2025-09-15T22:20:40.750Z
Learning: For CLP compression jobs, the team has decided to fail the entire job immediately upon encountering any invalid input path, rather than continuing to process valid paths. This decision was made during PR #1125 development.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-08T15:52:50.753Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-01T07:59:11.208Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
📚 Learning: 2025-01-15T16:36:48.932Z
Learnt from: haiqi96
Repo: y-scope/clp PR: 651
File: components/job-orchestration/job_orchestration/scheduler/job_config.py:29-39
Timestamp: 2025-01-15T16:36:48.932Z
Learning: The S3 service in the clp codebase only supports AWS S3, where region_code is mandatory. Other S3-like services are not supported.

Applied to files:

  • components/log-ingestor/src/aws_client_manager.rs
🧬 Code graph analysis (1)
components/log-ingestor/src/aws_client_manager.rs (2)
components/clp-rust-utils/src/sqs/client.rs (1)
  • create_new_client (14-35)
components/log-ingestor/src/ingestion_job.rs (2)
  • from (47-49)
  • from (53-55)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: rust-checks
  • GitHub Check: build (ubuntu-24.04)
  • GitHub Check: build (macos-15)
  • GitHub Check: lint-check (ubuntu-24.04)
🔇 Additional comments (2)
components/log-ingestor/src/aws_client_manager.rs (2)

51-61: SQS async constructor is consistent and correctly wired

The create constructor builds a region-specific SQS endpoint and passes arguments to clp_rust_utils::sqs::create_new_client in the expected order before wrapping with from. This is a clear, side‑effect‑free way to construct per‑region clients; no issues spotted.


82-92: S3 async constructor mirrors SQS pattern and looks correct

The S3 create implementation mirrors the SQS one, constructing a region‑specific endpoint and delegating to clp_rust_utils::s3::create_new_client, then wrapping via from. The symmetry between the two wrappers is good for maintainability; no problems identified.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Comment thread components/log-ingestor/src/ingestion_job_manager.rs
Comment thread components/clp-rust-utils/src/job_config/ingestion.rs
@LinZhihao-723 LinZhihao-723 changed the title feat(log-ingestor): Add partial implementation of LogIngestorManager for creating and deleting ingestion jobs. feat(log-ingestor): Add partial implementation of IngestionJobManager for creating and deleting ingestion jobs. Nov 30, 2025

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
components/log-ingestor/Cargo.toml (1)

4-4: Critical: Edition "2024" is invalid.

Rust editions only support 2015, 2018, and 2021. Set the edition to "2021" (the current stable edition) or another valid edition.

-edition = "2024"
+edition = "2021"
♻️ Duplicate comments (1)
components/log-ingestor/src/ingestion_job_manager.rs (1)

184-194: Hardcoded AWS SQS endpoint construction may not support all regions.

Similar to the S3 endpoint issue, the SQS endpoint construction using string formatting may not work for GovCloud, China regions, FIPS endpoints, or custom services.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e6d7a58 and 8e504c1.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (9)
  • components/clp-rust-utils/src/ingestion_job.rs (1 hunks)
  • components/clp-rust-utils/src/lib.rs (1 hunks)
  • components/log-ingestor/Cargo.toml (1 hunks)
  • components/log-ingestor/src/ingestion_job.rs (1 hunks)
  • components/log-ingestor/src/ingestion_job/s3_scanner.rs (4 hunks)
  • components/log-ingestor/src/ingestion_job/sqs_listener.rs (4 hunks)
  • components/log-ingestor/src/ingestion_job_manager.rs (1 hunks)
  • components/log-ingestor/src/lib.rs (1 hunks)
  • components/log-ingestor/tests/test_ingestion_job.rs (3 hunks)
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-08T15:52:50.753Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-01T07:59:11.208Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
📚 Learning: 2025-11-03T16:17:40.223Z
Learnt from: hoophalab
Repo: y-scope/clp PR: 1535
File: components/clp-rust-utils/src/clp_config/package/config.rs:47-61
Timestamp: 2025-11-03T16:17:40.223Z
Learning: In the y-scope/clp repository, the `ApiServer` struct in `components/clp-rust-utils/src/clp_config/package/config.rs` is a Rust-native configuration type and does not mirror any Python code, unlike other structs in the same file (Config, Database, ResultsCache, Package) which are mirrors of Python definitions.

Applied to files:

  • components/clp-rust-utils/src/ingestion_job.rs
📚 Learning: 2025-10-22T21:02:31.113Z
Learnt from: junhaoliao
Repo: y-scope/clp PR: 0
File: :0-0
Timestamp: 2025-10-22T21:02:31.113Z
Learning: Repository y-scope/clp: Maintain deterministic CI/builds for Rust; add a check to verify Cargo.lock is in sync with Cargo.toml without updating dependencies (non-mutating verification in clp-rust-checks workflow).

Applied to files:

  • components/clp-rust-utils/src/ingestion_job.rs
🧬 Code graph analysis (2)
components/log-ingestor/src/ingestion_job.rs (2)
components/log-ingestor/src/ingestion_job/s3_scanner.rs (2)
  • shutdown_and_join (184-187)
  • get_id (193-195)
components/log-ingestor/src/ingestion_job/sqs_listener.rs (2)
  • shutdown_and_join (219-222)
  • get_id (228-230)
components/log-ingestor/src/ingestion_job_manager.rs (3)
components/log-ingestor/src/ingestion_job.rs (2)
  • from (47-49)
  • from (53-55)
components/log-ingestor/src/ingestion_job/s3_scanner.rs (1)
  • spawn (152-171)
components/log-ingestor/src/ingestion_job/sqs_listener.rs (1)
  • spawn (187-206)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: package-image
  • GitHub Check: rust-checks
  • GitHub Check: lint-check (macos-15)
  • GitHub Check: lint-check (ubuntu-24.04)
🔇 Additional comments (16)
components/log-ingestor/Cargo.toml (1)

17-17: thiserror 2.0.17 is stable and compatible with Rust 2021.

Version 2.0.17 is a stable release with Rust 2021 edition support and MSRV of 1.61, confirming compatibility with the project's Rust edition. Dependency conflict verification requires codebase inspection of Cargo.lock and existing dependency tree.

components/log-ingestor/src/lib.rs (1)

4-4: LGTM!

The module declaration is correct and properly exposes the new ingestion job manager.

components/clp-rust-utils/src/lib.rs (1)

4-4: LGTM!

The module declaration correctly exposes the new ingestion job configuration types for cross-crate usage.

components/log-ingestor/src/ingestion_job/sqs_listener.rs (2)

14-22: LGTM!

The refactoring to use the centralized S3IngestionBaseConfig is clean and consistent. All field accesses have been properly updated to reference the nested base configuration.


144-163: LGTM!

The metadata extraction and object relevance checks correctly reference the nested base configuration fields (config.base.bucket_name and config.base.key_prefix).

components/log-ingestor/src/ingestion_job.rs (1)

8-56: LGTM!

The IngestionJob enum provides a clean abstraction over the different ingestion job types. The delegation methods and From implementations are correctly implemented.

components/log-ingestor/tests/test_ingestion_job.rs (2)

166-184: LGTM!

The test correctly constructs the SqsListenerConfig with the new nested S3IngestionBaseConfig structure.


251-262: LGTM!

The test correctly constructs the S3ScannerConfig with the new nested S3IngestionBaseConfig structure.

components/log-ingestor/src/ingestion_job/s3_scanner.rs (2)

12-18: LGTM!

The refactoring to use the centralized S3IngestionBaseConfig is clean and consistent with the SQS listener changes.


92-123: LGTM!

The S3 listing and metadata extraction correctly reference the nested base configuration fields.

components/log-ingestor/src/ingestion_job_manager.rs (5)

226-228: LGTM!

The is_mutually_prefix_free logic is correct. It properly detects when two prefixes conflict (one is a prefix of the other).


121-164: LGTM!

The generic create method properly enforces mutually prefix-free constraints across all existing jobs before creating a new entry. The locking strategy is correct, and the mutex guard is explicitly dropped after insertion.


53-63: LGTM!

The S3 scanner job creation method correctly extracts the key prefix and constructs the appropriate client manager and ingestion job.


76-91: LGTM!

The SQS listener job creation method follows the same pattern as the S3 scanner and correctly delegates to the generic create helper.


106-119: LGTM!

The shutdown logic correctly removes the job from the table before shutting down both the ingestion job and listener, ensuring proper cleanup.

components/clp-rust-utils/src/ingestion_job.rs (1)

1-21: Verify whether Serde derives are needed for API integration.

Based on the author's comment that this configuration is needed "to define API server parameters to create ingestion jobs," this struct may require serde::Serialize and serde::Deserialize derives for JSON serialization/deserialization in API endpoints. Check how this struct is used in API request handlers and compare with similar configuration structs in the codebase to confirm whether these derives are necessary.

Comment thread components/log-ingestor/src/ingestion_job_manager.rs
Comment thread components/log-ingestor/src/ingestion_job_manager.rs
Comment thread components/log-ingestor/src/ingestion_job_manager.rs Outdated
Comment thread components/log-ingestor/src/ingestion_job_manager.rs

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8e504c1 and 53a5869.

📒 Files selected for processing (1)
  • components/log-ingestor/src/ingestion_job_manager.rs (1 hunks)
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-08T15:52:50.753Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-01T07:59:11.208Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
📚 Learning: 2025-04-25T20:46:20.140Z
Learnt from: haiqi96
Repo: y-scope/clp PR: 852
File: components/clp-package-utils/clp_package_utils/scripts/native/compress.py:151-160
Timestamp: 2025-04-25T20:46:20.140Z
Learning: For S3 URLs without region specifications (legacy global endpoints), either assign a default region (us-east-1) or throw a clear error message requiring region specification in the URL. This addresses validation issues in components like S3InputConfig that require a non-nullable region string.

Applied to files:

  • components/log-ingestor/src/ingestion_job_manager.rs
📚 Learning: 2025-01-15T16:36:48.932Z
Learnt from: haiqi96
Repo: y-scope/clp PR: 651
File: components/job-orchestration/job_orchestration/scheduler/job_config.py:29-39
Timestamp: 2025-01-15T16:36:48.932Z
Learning: The S3 service in the clp codebase only supports AWS S3, where region_code is mandatory. Other S3-like services are not supported.

Applied to files:

  • components/log-ingestor/src/ingestion_job_manager.rs
🧬 Code graph analysis (1)
components/log-ingestor/src/ingestion_job_manager.rs (3)
components/log-ingestor/src/ingestion_job.rs (2)
  • from (47-49)
  • from (53-55)
components/log-ingestor/src/ingestion_job/s3_scanner.rs (1)
  • spawn (152-171)
components/log-ingestor/src/ingestion_job/sqs_listener.rs (1)
  • spawn (187-206)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
  • GitHub Check: package-image
  • GitHub Check: lint-check (macos-15)
  • GitHub Check: lint-check (ubuntu-24.04)
  • GitHub Check: rust-checks
  • GitHub Check: build (macos-15)
🔇 Additional comments (3)
components/log-ingestor/src/ingestion_job_manager.rs (3)

108-121: Good lock release pattern.

The implementation correctly removes the job entry from the table and explicitly drops the lock before performing the potentially blocking shutdown operations. This prevents holding the lock during long-running operations and avoids potential deadlocks.


155-167: Prefix conflict detection logic is correct.

The implementation correctly identifies conflicts when both the region matches and the key prefixes are not mutually prefix-free. The logic appropriately skips non-conflicting entries and returns a clear error message for conflicts.


257-258: Utility function is correct and handles edge cases.

The implementation correctly checks that neither string is a prefix of the other. The logic properly handles edge cases, including empty strings (an empty prefix would conflict with any non-empty prefix, which is the correct behavior).

Comment thread components/log-ingestor/src/ingestion_job_manager.rs Outdated

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (4)
components/log-ingestor/src/ingestion_job_manager.rs (4)

31-38: Public manager type is not constructible from outside this module.

IngestionJobManager is pub but all fields are private and there is no public constructor or factory, so other modules cannot instantiate it. Consider adding a pub fn new(...) -> Self (or a module-level pub fn create_ingestion_job_manager(...)) that initialises job_table to Arc::new(Mutex::new(HashMap::new())) and wires the configuration fields and AwsCredentials.


195-223: Hardcoded S3/SQS endpoints will not work for all AWS partitions and special endpoints.

create_s3_client_manager and create_sqs_client_manager build endpoints as https://s3.{region}.amazonaws.com and https://sqs.{region}.amazonaws.com. This only covers the standard aws partition; China (aws-cn) and GovCloud (aws-us-gov) require .amazonaws.com.cn and different host patterns, and FIPS endpoints use *-fips hosts. Given the repo’s convention that S3 only supports AWS S3 with a mandatory region_code, it would be safer to rely on the AWS SDK’s region-based endpoint resolution (or a small region→endpoint resolver) instead of string interpolation.


225-231: Listener creation wires buffer correctly, but submitter todo!() makes this path unsafe.

create_listener constructs a Buffer with CompressionJobSubmitter {} and spawns a Listener, so once ingestion jobs start producing ObjectMetadata, the submitter’s submit implementation will eventually be called. As long as CompressionJobSubmitter::submit remains a todo!(), any buffer flush will panic. Until the real implementation exists, consider returning a structured error from submit or gating creation of real jobs behind a feature/flag so this path cannot be hit in production.


242-250: Explicit todo!() in async trait impl is a sharp edge.

Because CompressionJobSubmitter implements BufferSubmitter using todo!(), any caller that reaches submit will panic the task. If you must land this as a partial implementation, you may want to replace todo!() with something like anyhow::bail!("CompressionJobSubmitter::submit is not implemented yet") so callers receive a recoverable error instead of an unconditional panic.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 092fe30 and 74f3302.

📒 Files selected for processing (2)
  • components/clp-rust-utils/src/clp_config/s3_config.rs (2 hunks)
  • components/log-ingestor/src/ingestion_job_manager.rs (1 hunks)
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-01T07:59:11.208Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-08T15:52:50.753Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
📚 Learning: 2025-04-25T20:46:20.140Z
Learnt from: haiqi96
Repo: y-scope/clp PR: 852
File: components/clp-package-utils/clp_package_utils/scripts/native/compress.py:151-160
Timestamp: 2025-04-25T20:46:20.140Z
Learning: For S3 URLs without region specifications (legacy global endpoints), either assign a default region (us-east-1) or throw a clear error message requiring region specification in the URL. This addresses validation issues in components like S3InputConfig that require a non-nullable region string.

Applied to files:

  • components/log-ingestor/src/ingestion_job_manager.rs
📚 Learning: 2025-01-15T16:36:48.932Z
Learnt from: haiqi96
Repo: y-scope/clp PR: 651
File: components/job-orchestration/job_orchestration/scheduler/job_config.py:29-39
Timestamp: 2025-01-15T16:36:48.932Z
Learning: The S3 service in the clp codebase only supports AWS S3, where region_code is mandatory. Other S3-like services are not supported.

Applied to files:

  • components/log-ingestor/src/ingestion_job_manager.rs
🧬 Code graph analysis (1)
components/log-ingestor/src/ingestion_job_manager.rs (4)
components/log-ingestor/tests/test_ingestion_job.rs (2)
  • mpsc (186-186)
  • mpsc (264-264)
components/log-ingestor/src/ingestion_job.rs (2)
  • from (47-49)
  • from (53-55)
components/log-ingestor/src/ingestion_job/s3_scanner.rs (1)
  • spawn (152-171)
components/log-ingestor/src/ingestion_job/sqs_listener.rs (1)
  • spawn (187-206)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
  • GitHub Check: package-image
  • GitHub Check: build (macos-15)
  • GitHub Check: build (ubuntu-24.04)
  • GitHub Check: rust-checks
  • GitHub Check: lint-check (ubuntu-24.04)
  • GitHub Check: lint-check (macos-15)
🔇 Additional comments (7)
components/clp-rust-utils/src/clp_config/s3_config.rs (5)

1-2: LGTM!

Imports are correctly specified for the SecretString usage and custom Serialize implementation.


28-34: LGTM!

The PartialEq implementation correctly handles SecretString comparison using expose_secret().


36-37: LGTM!

Standard Eq marker implementation.


57-78: LGTM!

The test correctly validates JSON serialization of both fields. The SecretString::new() usage is appropriate for secrecy 0.10.x.


22-26: Good use of SecretString for sensitive credential data.

This prevents accidental exposure of the secret via Debug logging. However, note that a custom Deserialize implementation would be needed if AwsCredentials must be loaded from configuration files (e.g., YAML/JSON). If credentials are only constructed programmatically, this is fine.

components/log-ingestor/src/ingestion_job_manager.rs (2)

40-92: Job creation APIs are structured cleanly.

create_s3_scanner_job and create_sqs_listener correctly derive region/key_prefix, build the AWS client managers before taking the job-table lock, and reuse create_s3_ingestion_job for shared logic. This keeps the hot path simple and avoids holding the mutex across .awaits.


252-258: Prefix-free helper matches the documented semantics.

is_mutually_prefix_free correctly returns false when either string is a prefix of the other and true otherwise, including edge cases like the empty string; the implementation is clear and symmetric.

Comment thread components/clp-rust-utils/src/clp_config/s3_config.rs Outdated
Comment thread components/log-ingestor/src/ingestion_job_manager.rs
Comment thread components/log-ingestor/src/ingestion_job_manager.rs

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 74f3302 and fdf259e.

📒 Files selected for processing (7)
  • components/clp-rust-utils/src/job_config.rs (1 hunks)
  • components/clp-rust-utils/src/job_config/compression.rs (1 hunks)
  • components/clp-rust-utils/src/job_config/ingestion.rs (1 hunks)
  • components/log-ingestor/src/compression/compression_job_submitter.rs (1 hunks)
  • components/log-ingestor/src/ingestion_job/s3_scanner.rs (4 hunks)
  • components/log-ingestor/src/ingestion_job/sqs_listener.rs (4 hunks)
  • components/log-ingestor/tests/test_ingestion_job.rs (3 hunks)
🧰 Additional context used
🧠 Learnings (6)
📓 Common learnings
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-08T15:52:50.753Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-01T07:59:11.208Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
📚 Learning: 2025-11-03T16:17:40.223Z
Learnt from: hoophalab
Repo: y-scope/clp PR: 1535
File: components/clp-rust-utils/src/clp_config/package/config.rs:47-61
Timestamp: 2025-11-03T16:17:40.223Z
Learning: In the y-scope/clp repository, the `ApiServer` struct in `components/clp-rust-utils/src/clp_config/package/config.rs` is a Rust-native configuration type and does not mirror any Python code, unlike other structs in the same file (Config, Database, ResultsCache, Package) which are mirrors of Python definitions.

Applied to files:

  • components/clp-rust-utils/src/job_config/ingestion.rs
📚 Learning: 2025-10-22T21:02:31.113Z
Learnt from: junhaoliao
Repo: y-scope/clp PR: 0
File: :0-0
Timestamp: 2025-10-22T21:02:31.113Z
Learning: Repository y-scope/clp: Maintain deterministic CI/builds for Rust; add a check to verify Cargo.lock is in sync with Cargo.toml without updating dependencies (non-mutating verification in clp-rust-checks workflow).

Applied to files:

  • components/clp-rust-utils/src/job_config/ingestion.rs
📚 Learning: 2025-09-28T15:00:22.170Z
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 1340
File: components/job-orchestration/job_orchestration/executor/compress/compression_task.py:528-528
Timestamp: 2025-09-28T15:00:22.170Z
Learning: In components/job-orchestration/job_orchestration/executor/compress/compression_task.py, there is a suggestion to refactor from passing logger as a parameter through multiple functions to creating a ClpCompressor class that takes the logger as a class member, with current helper functions becoming private member functions.

Applied to files:

  • components/log-ingestor/src/compression/compression_job_submitter.rs
📚 Learning: 2025-07-23T09:54:45.185Z
Learnt from: Bill-hbrhbr
Repo: y-scope/clp PR: 1122
File: components/core/src/clp/clp/CMakeLists.txt:175-195
Timestamp: 2025-07-23T09:54:45.185Z
Learning: In the CLP project, when reviewing CMakeLists.txt changes that introduce new compression library dependencies (BZip2, LibLZMA, LZ4, ZLIB), the team prefers to address conditional linking improvements in separate PRs rather than expanding the scope of focused migration PRs like the LibArchive task-based installation migration.

Applied to files:

  • components/log-ingestor/src/compression/compression_job_submitter.rs
📚 Learning: 2025-04-26T02:21:22.021Z
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 856
File: components/core/src/clp/ffi/ir_stream/search/utils.cpp:258-266
Timestamp: 2025-04-26T02:21:22.021Z
Learning: In the clp::ffi::ir_stream::search namespace, the design principle is that callers are responsible for type checking, not the called functions. If control flow reaches a function, input types should already be validated by the caller.

Applied to files:

  • components/clp-rust-utils/src/job_config.rs
🧬 Code graph analysis (1)
components/log-ingestor/src/compression/compression_job_submitter.rs (3)
components/clp-py-utils/clp_py_utils/clp_config.py (2)
  • AwsAuthentication (470-500)
  • S3Config (503-507)
components/clp-rust-utils/src/database/mysql.rs (1)
  • create_mysql_pool (20-36)
components/job-orchestration/job_orchestration/scheduler/job_config.py (3)
  • ClpIoConfig (56-58)
  • OutputConfig (47-53)
  • S3InputConfig (32-44)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
  • GitHub Check: package-image
  • GitHub Check: lint-check (macos-15)
  • GitHub Check: lint-check (ubuntu-24.04)
  • GitHub Check: build (ubuntu-24.04)
  • GitHub Check: rust-checks
  • GitHub Check: build (macos-15)
🔇 Additional comments (8)
components/clp-rust-utils/src/job_config/compression.rs (1)

1-13: LGTM!

The enum is well-structured with appropriate derives for serialization (Serialize/Deserialize) and primitive conversion (IntoPrimitive/TryFromPrimitive). The documentation comment about mirroring the Python constant is helpful for maintainability.

components/clp-rust-utils/src/job_config.rs (1)

4-7: LGTM!

The module declaration and glob re-export follow the established pattern used by other modules in this file.

components/log-ingestor/src/ingestion_job/sqs_listener.rs (1)

14-22: LGTM!

The refactoring to use S3IngestionBaseConfig consolidates common S3 configuration fields into a shared base type, improving consistency across SqsListenerConfig and S3ScannerConfig.

components/clp-rust-utils/src/job_config/ingestion.rs (1)

1-24: LGTM!

The struct is well-documented and serves as a clean abstraction for shared S3 ingestion parameters. The fields align with the Python S3InputConfig and S3Config schemas from the relevant code snippets.

components/log-ingestor/src/ingestion_job/s3_scanner.rs (1)

12-18: LGTM!

The refactoring to use S3IngestionBaseConfig is consistent with the changes in SqsListenerConfig, enabling shared configuration across ingestion job types.

components/log-ingestor/tests/test_ingestion_job.rs (2)

173-181: LGTM!

Test configuration correctly updated to use the new S3IngestionBaseConfig structure with all required and optional fields explicitly specified.


253-261: LGTM!

Consistent with the SQS listener test configuration update.

components/log-ingestor/src/compression/compression_job_submitter.rs (1)

4-10: Unable to verify the review comment. Repository access failed during verification attempt, making it impossible to confirm the import path issues, unused imports, or the accuracy of the proposed fix.

Comment thread components/log-ingestor/src/compression/compression_job_submitter.rs Outdated
Comment on lines +141 to +154
match status {
CompressionJobStatus::Failed => {
tracing::error!(
"Compression job {} failed. Status message: {:?}",
job_id,
status_message
);
return;
}
CompressionJobStatus::Succeeded => {
break;
}
_ => continue,
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Killed status is treated as a transient state, causing indefinite polling.

When status is CompressionJobStatus::Killed, the _ => continue branch will keep polling indefinitely. A killed job won't transition to Succeeded or Failed, so this loop will never exit.

Apply this diff:

                 match status {
                     CompressionJobStatus::Failed => {
                         tracing::error!(
                             "Compression job {} failed. Status message: {:?}",
                             job_id,
                             status_message
                         );
                         return;
                     }
                     CompressionJobStatus::Succeeded => {
                         break;
                     }
+                    CompressionJobStatus::Killed => {
+                        tracing::warn!(
+                            "Compression job {} was killed. Status message: {:?}",
+                            job_id,
+                            status_message
+                        );
+                        return;
+                    }
                     _ => continue,
                 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
match status {
CompressionJobStatus::Failed => {
tracing::error!(
"Compression job {} failed. Status message: {:?}",
job_id,
status_message
);
return;
}
CompressionJobStatus::Succeeded => {
break;
}
_ => continue,
}
match status {
CompressionJobStatus::Failed => {
tracing::error!(
"Compression job {} failed. Status message: {:?}",
job_id,
status_message
);
return;
}
CompressionJobStatus::Succeeded => {
break;
}
CompressionJobStatus::Killed => {
tracing::warn!(
"Compression job {} was killed. Status message: {:?}",
job_id,
status_message
);
return;
}
_ => continue,
}
🤖 Prompt for AI Agents
In components/log-ingestor/src/compression/compression_job_submitter.rs around
lines 141 to 154, the match treats CompressionJobStatus::Killed as a transient
state via the catch-all arm, causing infinite polling; add an explicit
CompressionJobStatus::Killed arm that logs the killed status (including job_id
and status_message) and then returns (or otherwise exits the loop) instead of
continuing, so killed jobs do not cause indefinite retries.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (6)
components/log-ingestor/src/ingestion_job_manager.rs (6)

32-39: Expose a constructor for IngestionJobManager so callers can actually use it.

IngestionJobManager is pub but all fields are private and there is no pub fn new (or other factory), so external code currently cannot construct an instance. Consider adding a constructor that initialises job_table and wires in the buffer and AWS settings, e.g.:

 impl IngestionJobManager {
+    /// Creates a new `IngestionJobManager`.
+    #[must_use]
+    pub fn new(
+        buffer_timeout: Duration,
+        buffer_size_threshold: u64,
+        channel_capacity: usize,
+        aws_credentials: AwsCredentials,
+    ) -> Self {
+        Self {
+            job_table: Arc::new(Mutex::new(HashMap::new())),
+            buffer_timeout,
+            buffer_size_threshold,
+            channel_capacity,
+            aws_credentials,
+        }
+    }
+
     /// Creates a new S3 scanner ingestion job.

93-119: Ensure listener shutdown is attempted even if ingestion shutdown fails.

If ingestion_job.shutdown_and_join().await returns an error, listener.shutdown_and_join().await is never called, so the compression listener may keep running even though the job is already removed from job_table. It would be safer to perform both shutdowns in a best-effort manner (e.g., call both, then combine/choose which error to return) so one failure doesn’t leak the other task.


174-181: UUID generation loop is correct but could be simplified.

The loop with contains_key is functionally fine and safe under the mutex, but it’s slightly verbose. You could either accept the astronomically low collision risk and generate once, or use HashMap::entry for a single lookup. Since a similar nit was previously raised, feel free to leave as-is if you prefer this style.


207-220: Avoid hardcoding S3 endpoints; rely on SDK region-based resolution instead.

Constructing https://s3.{region}.amazonaws.com only works for the standard AWS partition and misses GovCloud/China/FIPS variations. Given the retrieved learnings that S3 is AWS-only and region is mandatory, it’s better to let the AWS SDK derive the endpoint from the region (or use a small region→endpoint resolver) rather than interpolating the hostname yourself.


222-235: Similarly, avoid hardcoding SQS endpoints; delegate to the SDK.

https://sqs.{region}.amazonaws.com has the same partition limitations as the S3 endpoint. Prefer configuring the client with the region and letting the AWS Rust SDK choose the correct host (including GovCloud/China/FIPS cases) instead of manual string construction.


256-264: CompressionJobSubmitter::submit uses todo!() in a production path.

Because create_listener always instantiates CompressionJobSubmitter, any buffer submission will currently panic. The PR description notes this is intentional pending a future implementation, but you may want to replace todo! with a more descriptive error (or gate job creation) to avoid surprising runtime panics once the manager is wired up.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8719d55 and 9b07f51.

📒 Files selected for processing (1)
  • components/log-ingestor/src/ingestion_job_manager.rs (1 hunks)
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-08T15:52:50.753Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-01T07:59:11.208Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
📚 Learning: 2025-04-25T20:46:20.140Z
Learnt from: haiqi96
Repo: y-scope/clp PR: 852
File: components/clp-package-utils/clp_package_utils/scripts/native/compress.py:151-160
Timestamp: 2025-04-25T20:46:20.140Z
Learning: For S3 URLs without region specifications (legacy global endpoints), either assign a default region (us-east-1) or throw a clear error message requiring region specification in the URL. This addresses validation issues in components like S3InputConfig that require a non-nullable region string.

Applied to files:

  • components/log-ingestor/src/ingestion_job_manager.rs
📚 Learning: 2025-01-15T16:36:48.932Z
Learnt from: haiqi96
Repo: y-scope/clp PR: 651
File: components/job-orchestration/job_orchestration/scheduler/job_config.py:29-39
Timestamp: 2025-01-15T16:36:48.932Z
Learning: The S3 service in the clp codebase only supports AWS S3, where region_code is mandatory. Other S3-like services are not supported.

Applied to files:

  • components/log-ingestor/src/ingestion_job_manager.rs
🧬 Code graph analysis (1)
components/log-ingestor/src/ingestion_job_manager.rs (4)
components/log-ingestor/tests/test_ingestion_job.rs (2)
  • mpsc (187-187)
  • mpsc (266-266)
components/log-ingestor/src/ingestion_job.rs (2)
  • from (47-49)
  • from (53-55)
components/log-ingestor/src/ingestion_job/sqs_listener.rs (1)
  • spawn (187-206)
components/log-ingestor/src/ingestion_job/s3_scanner.rs (1)
  • spawn (152-171)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
  • GitHub Check: package-image
  • GitHub Check: rust-checks
  • GitHub Check: build (ubuntu-24.04)
  • GitHub Check: build (macos-15)
  • GitHub Check: lint-check (ubuntu-24.04)
  • GitHub Check: lint-check (macos-15)
🔇 Additional comments (6)
components/log-ingestor/src/ingestion_job_manager.rs (6)

41-63: S3 scanner job creation flow looks sound.

Cloning the base config, creating a region-scoped S3 client manager, and delegating to create_s3_ingestion_job with a closure that spawns S3Scanner is straightforward and matches the intended job lifecycle; no issues from this function itself.


65-91: SQS listener job creation mirrors the S3 scanner path appropriately.

Using the shared S3IngestionBaseConfig for prefix conflict detection and wiring SqsListener::spawn through the same helper keeps the API consistent; this function itself looks correct.


121-172: Prefix-conflict detection now correctly includes bucket and dataset.

The updated check on (region, bucket_name, dataset) plus is_mutually_prefix_free matches the documented behaviour and fixes the prior false-conflict issue for identical prefixes across different buckets/datasets. The error message is clear and aligned with the logic.


183-205: Job-table insertion and listener wiring are coherent.

Cloning region/bucket/prefix/dataset into the table entry, creating a per-job Listener, and passing a fresh sender into the job-creation callback gives each job an isolated ingestion path while keeping shared config in the manager. No obvious correctness issues here.


246-254: Job table entry structure aligns with the conflict-detection logic.

Storing region, bucket_name, key_prefix, and dataset alongside the job and listener matches how conflicts are checked in create_s3_ingestion_job; this looks coherent.


266-272: is_mutually_prefix_free correctly captures the intended relationship.

The implementation directly matches the doc comment: it returns true only when neither string is a prefix of the other. This is exactly what the conflict logic expects.

Comment on lines +237 to +243
/// # Returns
///
/// A new listener for receiving object metadata to ingest.
fn create_listener(&self, _ingestion_job_config: S3IngestionBaseConfig) -> Listener {
let buffer = Buffer::new(CompressionJobSubmitter {}, self.buffer_size_threshold);
Listener::spawn(buffer, self.buffer_timeout, self.channel_capacity)
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

create_listener ignores the per-job config; consider dropping or using the parameter.

_ingestion_job_config is currently unused, and all jobs share the same buffer settings from the manager. That may be intentional, but if listener behaviour is truly global, you could drop the parameter entirely for now to avoid confusion; if you plan to use it later, a short comment would clarify that it’s reserved for future per-job tuning.

🤖 Prompt for AI Agents
In components/log-ingestor/src/ingestion_job_manager.rs around lines 237 to 243,
the create_listener function takes _ingestion_job_config but ignores it and
always uses manager-wide buffer settings; either remove the unused parameter
from the signature (and update all call sites) to avoid confusion, or use the
provided S3IngestionBaseConfig to derive per-job
buffer_size_threshold/timeout/capacity when constructing Buffer and Listener; if
you intend to keep the parameter for future use, add a brief comment above the
parameter stating it is reserved for future per-job tuning to make the intent
explicit.

@hoophalab hoophalab left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good to me in general. There are some comments on code organization.

Comment thread components/clp-rust-utils/src/clp_config/s3_config.rs Outdated
@@ -0,0 +1,24 @@
/// Base configuration for ingesting logs from S3.
#[derive(Clone, Debug)]
pub struct S3IngestionBaseConfig {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is going to be serialized into clp-db? Can we reuse/extend S3InputConfig which inherits S3Config? The fields look same except tags. If we use an existing job config, we don't need to change any Python code in compression_scheduler/compression_worker

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • This won't get into clp-db for job submission. It's only interfacing users to submit ingestion jobs to the server.
  • Prefer to have a dedicated struct: it makes API document more clear.

Comment thread components/log-ingestor/src/ingestion_job_manager.rs
Comment thread components/log-ingestor/src/ingestion_job_manager.rs
Comment thread components/log-ingestor/src/ingestion_job_manager.rs
Comment thread components/log-ingestor/src/ingestion_job_manager.rs Outdated
Comment thread components/log-ingestor/src/ingestion_job_manager.rs Outdated
Comment thread components/log-ingestor/src/ingestion_job.rs

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
components/log-ingestor/Cargo.toml (1)

4-4: Edition "2024" is invalid and will prevent compilation.

Rust editions are "2015", "2018", or "2021". The value "2024" does not exist and will cause a compilation error.

Apply this diff to correct the edition:

-edition = "2024"
+edition = "2021"
♻️ Duplicate comments (4)
components/log-ingestor/src/ingestion_job_manager.rs (3)

227-235: CompressionJobSubmitter stub is acceptable for a partial PR but must be gated.

CompressionJobSubmitter::submit is still todo!(), so any path that actually drains a buffer will panic. Since the PR is explicitly a partial implementation, this is fine as long as IngestionJobManager (and thus Listener with this submitter) is not yet used in production code. Before wiring it into real flows, consider returning a structured error instead of panicking, or implementing the real submission logic.


31-38: Consider adding an explicit constructor for IngestionJobManager.

IngestionJobManager is pub but all its fields are private and there is no new/builder, so it cannot be constructed from other modules without a struct literal in this file. If you expect external callers to use it (even in a future PR), adding a pub fn new(...) -> Self that initialises job_table and takes the buffer settings and AwsCredentials would make the API clearer and avoid ad‑hoc initialisation.


111-123: Ensure listener is shut down even if ingestion job shutdown fails.

In shutdown_and_remove_job, if ingestion_job.shutdown_and_join().await returns an error, listener.shutdown_and_join().await is never called, so the listener task can be left running even though the job has been removed from the table.

You can attempt both shutdowns and then decide how to surface errors, for example:

-            Some(entry) => {
-                entry.ingestion_job.shutdown_and_join().await?;
-                entry.listener.shutdown_and_join().await?;
-                Ok(())
-            }
+            Some(entry) => {
+                let ingestion_result = entry.ingestion_job.shutdown_and_join().await;
+                let listener_result = entry.listener.shutdown_and_join().await;
+
+                if let Err(err) = ingestion_result {
+                    return Err(Error::from(err));
+                }
+                if let Err(err) = listener_result {
+                    return Err(Error::from(err));
+                }
+                Ok(())
+            }

This way, both components get a best‑effort shutdown, and you still propagate errors to the caller.

components/log-ingestor/src/aws_client_manager.rs (1)

5-8: Hard‑coded S3/SQS endpoints in create limit partition and LocalStack support.

SqsClientWrapper::create and S3ClientWrapper::create always construct https://sqs.{region}.amazonaws.com / https://s3.{region}.amazonaws.com, ignoring any partition differences (e.g., amazonaws.com.cn for China, GovCloud, FIPS variants) and making it hard to point at LocalStack or other custom endpoints. Based on learnings, the S3 service here is intended to support AWS S3 generally, so baking in the commercial-domain pattern is brittle.

Consider one of:

  • Letting the AWS SDK derive the endpoint purely from the region (do not pass a custom endpoint URL for normal AWS usage).
  • Accepting an explicit endpoint (or a small region→endpoint resolver) so you can handle aws-cn / aws-us-gov / FIPS and LocalStack without recomputing hostnames ad hoc.
  • At minimum, documenting that these helpers are only valid for standard AWS commercial regions.

Also applies to: 55-65, 86-96

📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 9b07f51 and 20241f0.

📒 Files selected for processing (6)
  • components/clp-rust-utils/src/s3/client.rs (1 hunks)
  • components/clp-rust-utils/src/sqs/client.rs (1 hunks)
  • components/log-ingestor/Cargo.toml (1 hunks)
  • components/log-ingestor/src/aws_client_manager.rs (3 hunks)
  • components/log-ingestor/src/ingestion_job_manager.rs (1 hunks)
  • components/log-ingestor/tests/test_ingestion_job.rs (5 hunks)
🧰 Additional context used
🧠 Learnings (6)
📓 Common learnings
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-08T15:52:50.753Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-01T07:59:11.208Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
📚 Learning: 2025-04-25T20:46:20.140Z
Learnt from: haiqi96
Repo: y-scope/clp PR: 852
File: components/clp-package-utils/clp_package_utils/scripts/native/compress.py:151-160
Timestamp: 2025-04-25T20:46:20.140Z
Learning: For S3 URLs without region specifications (legacy global endpoints), either assign a default region (us-east-1) or throw a clear error message requiring region specification in the URL. This addresses validation issues in components like S3InputConfig that require a non-nullable region string.

Applied to files:

  • components/log-ingestor/src/ingestion_job_manager.rs
📚 Learning: 2025-01-15T16:36:48.932Z
Learnt from: haiqi96
Repo: y-scope/clp PR: 651
File: components/job-orchestration/job_orchestration/scheduler/job_config.py:29-39
Timestamp: 2025-01-15T16:36:48.932Z
Learning: The S3 service in the clp codebase only supports AWS S3, where region_code is mandatory. Other S3-like services are not supported.

Applied to files:

  • components/log-ingestor/src/ingestion_job_manager.rs
📚 Learning: 2025-09-15T22:20:40.750Z
Learnt from: quinntaylormitchell
Repo: y-scope/clp PR: 1125
File: components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py:267-291
Timestamp: 2025-09-15T22:20:40.750Z
Learning: For CLP compression jobs, the team has decided to fail the entire job immediately upon encountering any invalid input path, rather than continuing to process valid paths. This decision was made during PR #1125 development.

Applied to files:

  • components/log-ingestor/src/ingestion_job_manager.rs
📚 Learning: 2025-10-22T21:02:31.113Z
Learnt from: junhaoliao
Repo: y-scope/clp PR: 0
File: :0-0
Timestamp: 2025-10-22T21:02:31.113Z
Learning: Repository y-scope/clp: Maintain deterministic CI/builds for Rust; add a check to verify Cargo.lock is in sync with Cargo.toml without updating dependencies (non-mutating verification in clp-rust-checks workflow).

Applied to files:

  • components/log-ingestor/Cargo.toml
📚 Learning: 2025-10-22T21:01:31.391Z
Learnt from: junhaoliao
Repo: y-scope/clp PR: 0
File: :0-0
Timestamp: 2025-10-22T21:01:31.391Z
Learning: In y-scope/clp, for the clp-rust-checks workflow (GitHub Actions) and the Taskfile target deps:lock:check-rust, we should verify Rust Cargo.lock is in sync with Cargo.toml using a non-mutating method (e.g., cargo metadata --locked / cargo check --locked) to keep CI deterministic and avoid updating dependencies during validation.

Applied to files:

  • components/log-ingestor/Cargo.toml
🧬 Code graph analysis (2)
components/log-ingestor/src/ingestion_job_manager.rs (4)
components/log-ingestor/src/aws_client_manager.rs (4)
  • from (51-53)
  • from (82-84)
  • create (55-65)
  • create (86-96)
components/log-ingestor/src/ingestion_job.rs (2)
  • from (47-49)
  • from (53-55)
components/log-ingestor/src/ingestion_job/s3_scanner.rs (1)
  • spawn (152-171)
components/log-ingestor/src/ingestion_job/sqs_listener.rs (1)
  • spawn (187-206)
components/log-ingestor/src/aws_client_manager.rs (2)
components/clp-rust-utils/src/sqs/client.rs (1)
  • create_new_client (14-35)
components/log-ingestor/src/ingestion_job.rs (2)
  • from (47-49)
  • from (53-55)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
  • GitHub Check: package-image
  • GitHub Check: build (ubuntu-24.04)
  • GitHub Check: build (macos-15)
  • GitHub Check: lint-check (ubuntu-24.04)
  • GitHub Check: lint-check (macos-15)
  • GitHub Check: rust-checks
🔇 Additional comments (6)
components/log-ingestor/Cargo.toml (3)

22-22: Verify that thiserror is actually used in the codebase.

The thiserror dependency was added but its usage should be confirmed to avoid unnecessary dependencies. Search for use thiserror imports and #[derive(Error)] macro usage in the log-ingestor source files to ensure this dependency is actively utilized.


1-25: Verify Cargo.lock is in sync with Cargo.toml changes.

Per CI determinism requirements, confirm that the Cargo.lock file reflects all dependency changes in this diff.


14-14: Verify that removing the serde feature from secrecy does not break code.

The serde feature was removed from secrecy. If any code serializes or deserializes secrecy types (e.g., Secret<String>), this change will break compilation.

components/clp-rust-utils/src/s3/client.rs (1)

18-27: Signature change to &str and credential wiring look correct.

Switching secret_access_key to &str and passing it directly into Credentials::new is consistent with the SQS helper and updated call sites; the client construction logic remains unchanged and looks fine.

components/clp-rust-utils/src/sqs/client.rs (1)

14-23: SQS client helper update is consistent with the S3 helper.

Using secret_access_key: &str and passing it straight into Credentials::new keeps behaviour the same while aligning with the new call-site types; no issues from a correctness standpoint.

components/log-ingestor/src/ingestion_job_manager.rs (1)

211-214: Listener creation matches the buffering model; _ingestion_job_config is clearly marked unused.

create_listener correctly wires a Buffer<CompressionJobSubmitter> into a Listener using the manager’s buffer settings, and the leading underscore on _ingestion_job_config makes it clear the per‑job config is reserved for future tuning. If you plan to make buffering parameters per‑job later, you can start deriving them from this config without changing the public surface.

Comment thread components/log-ingestor/src/ingestion_job_manager.rs
Comment thread components/log-ingestor/src/ingestion_job_manager.rs
Comment thread components/log-ingestor/src/ingestion_job_manager.rs
Comment thread components/log-ingestor/tests/test_ingestion_job.rs
hoophalab
hoophalab previously approved these changes Dec 5, 2025

@hoophalab hoophalab left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm. one nitpick

}

pub async fn create(region: &str, access_key_id: &str, secret_access_key: &str) -> Self {
let sqs_endpoint = format!("https://sqs.{region}.amazonaws.com");

@hoophalab hoophalab Dec 5, 2025

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might need to revisit this later if we want to test ingestion manager on localstack. Not sure if we can accept an endpoint in parameter and in S3IngestionBaseConfig and pass the endpoint here.

But let's merge this as-is for now.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we can figure out how to pass endpoint later. Should be fine.

Comment thread components/log-ingestor/src/aws_client_manager.rs Outdated
@LinZhihao-723 LinZhihao-723 merged commit c5e3fe0 into y-scope:main Dec 5, 2025
21 of 22 checks passed
junhaoliao pushed a commit to junhaoliao/clp that referenced this pull request May 17, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants