feat(log-ingestor): Add CompressionJobSubmitter for async compression-job submission and result retrieval.#1736
Conversation
WalkthroughAdds a new CLP compression job submitter that enqueues and monitors jobs via MySQL, exports a new default dataset constant, introduces sqlx and const_format dependencies, and refactors the ingestion job manager to provide AWS credentials and a MySQL pool for submission. Changes
Sequence DiagramsequenceDiagram
participant Listener as Log Buffer Listener
participant Submitter as CompressionJobSubmitter
participant DB as MySQL Database
participant Monitor as Background Poll Task
Listener->>Submitter: submit(buffer)
activate Submitter
Submitter->>Submitter: Populate IO config with buffer keys
Submitter->>Monitor: Spawn background task
deactivate Submitter
activate Monitor
Monitor->>DB: Insert serialized IO config -> compression_jobs
activate DB
DB-->>Monitor: return job_id
deactivate DB
loop Poll with exponential backoff (<=30s)
Monitor->>DB: Query job status by job_id
activate DB
DB-->>Monitor: status (Pending/Success/Failure/Unknown)
deactivate DB
alt Success
Monitor->>Monitor: Log completion and exit
else Failure
Monitor->>Monitor: Log failure and exit
else Pending
Monitor->>Monitor: Wait/backoff and continue
end
end
deactivate Monitor
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: ASSERTIVE Plan: Pro 📒 Files selected for processing (1)
🧰 Additional context used🧠 Learnings (9)📓 Common learnings📚 Learning: 2025-09-28T15:00:22.170ZApplied to files:
📚 Learning: 2025-07-23T09:54:45.185ZApplied to files:
📚 Learning: 2025-09-19T18:28:26.747ZApplied to files:
📚 Learning: 2025-08-08T06:59:42.436ZApplied to files:
📚 Learning: 2025-09-15T22:20:40.750ZApplied to files:
📚 Learning: 2025-01-16T16:58:43.190ZApplied to files:
📚 Learning: 2025-08-09T04:07:27.083ZApplied to files:
📚 Learning: 2025-08-25T06:29:59.610ZApplied to files:
🧬 Code graph analysis (1)components/log-ingestor/src/compression/compression_job_submitter.rs (2)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
🔇 Additional comments (4)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (6)
components/clp-rust-utils/src/clp_config/package.rs(1 hunks)components/clp-rust-utils/src/clp_config/package/config.rs(0 hunks)components/log-ingestor/Cargo.toml(1 hunks)components/log-ingestor/src/compression.rs(1 hunks)components/log-ingestor/src/compression/compression_job_submitter.rs(1 hunks)components/log-ingestor/src/ingestion_job_manager.rs(4 hunks)
💤 Files with no reviewable changes (1)
- components/clp-rust-utils/src/clp_config/package/config.rs
🧰 Additional context used
🧠 Learnings (5)
📓 Common learnings
Learnt from: Bill-hbrhbr
Repo: y-scope/clp PR: 1122
File: components/core/src/clp/clp/CMakeLists.txt:175-195
Timestamp: 2025-07-23T09:54:45.185Z
Learning: In the CLP project, when reviewing CMakeLists.txt changes that introduce new compression library dependencies (BZip2, LibLZMA, LZ4, ZLIB), the team prefers to address conditional linking improvements in separate PRs rather than expanding the scope of focused migration PRs like the LibArchive task-based installation migration.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 1340
File: components/job-orchestration/job_orchestration/executor/compress/compression_task.py:528-528
Timestamp: 2025-09-28T15:00:22.170Z
Learning: In components/job-orchestration/job_orchestration/executor/compress/compression_task.py, there is a suggestion to refactor from passing logger as a parameter through multiple functions to creating a ClpCompressor class that takes the logger as a class member, with current helper functions becoming private member functions.
Learnt from: junhaoliao
Repo: y-scope/clp PR: 1152
File: components/clp-package-utils/clp_package_utils/scripts/start_clp.py:613-613
Timestamp: 2025-08-08T06:59:42.436Z
Learning: In components/clp-package-utils/clp_package_utils/scripts/start_clp.py, generic_start_scheduler sets CLP_LOGGING_LEVEL using clp_config.query_scheduler.logging_level for both schedulers; compression scheduler should use its own logging level. Tracking via an issue created from PR #1152 discussion.
Learnt from: quinntaylormitchell
Repo: y-scope/clp PR: 1125
File: components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py:267-291
Timestamp: 2025-09-15T22:20:40.750Z
Learning: For CLP compression jobs, the team has decided to fail the entire job immediately upon encountering any invalid input path, rather than continuing to process valid paths. This decision was made during PR #1125 development.
Learnt from: junhaoliao
Repo: y-scope/clp PR: 1178
File: components/clp-package-utils/clp_package_utils/controller.py:217-223
Timestamp: 2025-09-25T05:13:13.298Z
Learning: The compression scheduler service in CLP runs with CLP_UID_GID (current user's UID:GID) rather than CLP_SERVICE_CONTAINER_UID_GID (999:999), unlike infrastructure services such as database, queue, redis, and results cache which run with the service container UID:GID.
Learnt from: haiqi96
Repo: y-scope/clp PR: 651
File: components/clp-package-utils/clp_package_utils/scripts/compress.py:0-0
Timestamp: 2025-01-16T16:58:43.190Z
Learning: In the clp-package compression flow, path validation and error handling is performed at the scheduler level rather than in the compress.py script to maintain simplicity and avoid code duplication.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-08T15:52:50.753Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-01T07:59:11.208Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
📚 Learning: 2025-11-03T16:17:40.223Z
Learnt from: hoophalab
Repo: y-scope/clp PR: 1535
File: components/clp-rust-utils/src/clp_config/package/config.rs:47-61
Timestamp: 2025-11-03T16:17:40.223Z
Learning: In the y-scope/clp repository, the `ApiServer` struct in `components/clp-rust-utils/src/clp_config/package/config.rs` is a Rust-native configuration type and does not mirror any Python code, unlike other structs in the same file (Config, Database, ResultsCache, Package) which are mirrors of Python definitions.
Applied to files:
components/clp-rust-utils/src/clp_config/package.rs
📚 Learning: 2025-09-28T15:00:22.170Z
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 1340
File: components/job-orchestration/job_orchestration/executor/compress/compression_task.py:528-528
Timestamp: 2025-09-28T15:00:22.170Z
Learning: In components/job-orchestration/job_orchestration/executor/compress/compression_task.py, there is a suggestion to refactor from passing logger as a parameter through multiple functions to creating a ClpCompressor class that takes the logger as a class member, with current helper functions becoming private member functions.
Applied to files:
components/log-ingestor/src/compression/compression_job_submitter.rs
📚 Learning: 2025-10-22T21:02:31.113Z
Learnt from: junhaoliao
Repo: y-scope/clp PR: 0
File: :0-0
Timestamp: 2025-10-22T21:02:31.113Z
Learning: Repository y-scope/clp: Maintain deterministic CI/builds for Rust; add a check to verify Cargo.lock is in sync with Cargo.toml without updating dependencies (non-mutating verification in clp-rust-checks workflow).
Applied to files:
components/log-ingestor/Cargo.toml
📚 Learning: 2025-10-22T21:01:31.391Z
Learnt from: junhaoliao
Repo: y-scope/clp PR: 0
File: :0-0
Timestamp: 2025-10-22T21:01:31.391Z
Learning: In y-scope/clp, for the clp-rust-checks workflow (GitHub Actions) and the Taskfile target deps:lock:check-rust, we should verify Rust Cargo.lock is in sync with Cargo.toml using a non-mutating method (e.g., cargo metadata --locked / cargo check --locked) to keep CI deterministic and avoid updating dependencies during validation.
Applied to files:
components/log-ingestor/Cargo.toml
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
- GitHub Check: package-image
- GitHub Check: lint-check (macos-15)
- GitHub Check: lint-check (ubuntu-24.04)
- GitHub Check: build (macos-15)
- GitHub Check: build (ubuntu-24.04)
- GitHub Check: rust-checks
🔇 Additional comments (8)
components/clp-rust-utils/src/clp_config/package.rs (1)
7-9: LGTM!The constant is appropriately named and documented as a mirror of the Python configuration. It's correctly used in the compression job submitter for the dataset fallback.
components/log-ingestor/src/compression.rs (1)
2-6: LGTM!The module declaration and re-export follow the existing pattern in this file, maintaining consistency with
bufferandlistenermodules.components/log-ingestor/src/compression/compression_job_submitter.rs (2)
49-102: LGTM!The constructor properly builds the IO config template from the provided configurations. The
#[must_use]attribute and fallback handling for the dataset (lines 74-79) are appropriate.
156-177: LGTM!The status handling logic correctly maps numeric status codes to the typed enum and handles all terminal states (Failed, Succeeded) while allowing intermediate states to continue polling.
components/log-ingestor/src/ingestion_job_manager.rs (3)
38-46: LGTM!The new fields
archive_output_configandmysql_poolare appropriately added to support CLP integration. The struct remains cohesive with related configuration grouped together.
266-278: LGTM!The updated
create_listenermethod cleanly integrates the newCompressionJobSubmitterwith all required dependencies. The wiring between the MySQL pool, AWS credentials, archive config, and ingestion job config is properly handled.
68-78: Verify thatAwsAuthenticationenum doesn't have other variants besidesCredentials.The nested match on line 69-71 assumes
AwsAuthentication::Credentialsis the only variant. If theAwsAuthenticationenum has additional variants (e.g., IAM role-based authentication), this match will fail to compile, forcing proper handling of new authentication types. Confirm this is the intended behavior for the codebase.components/log-ingestor/Cargo.toml (1)
14-17: Dependency versions are already current. Bothconst_format = "0.2.35"andsqlx = { version = "0.8.6", ... }are the latest stable versions available on crates.io as of December 2025. The features specified forsqlxare appropriate for the async Tokio runtime and MySQL support.Likely an incorrect or invalid review comment.
There was a problem hiding this comment.
lgtm. One high-level question: Do you want to make BufferSubmitter::submit blocking rather than fire and forget? return void rather than Result<()>? Otherwise, it might be better to return void rather than Result<()> in both Buffer::submit and BufferSubmitter::submit.
One typo in the title:
feat(log-ingestor): Add `CompressionJobSubmitter `for async compression job submission and result retrieval.
| io_config_template: ClpIoConfig, | ||
| } | ||
|
|
||
| #[async_trait::async_trait] |
There was a problem hiding this comment.
| #[async_trait::async_trait] | |
| #[async_trait] |
import async trait to keep consistent with other files?
We don't want to make it blocking, as the compression job may take a long time to finish. In the high-level design, we will eventually implement a proper compression task that:
And such a compression task is recoverable from a failure. The current implementation of compression job submitter is a simplified version of this model, while:
You can find the entire flow in this doc. |
CompressionJobSubmitter for asyn compression job submission and result retrieval.CompressionJobSubmitter for async compression job submission and result retrieval.
And in terms of return type, I think making it a Result is more future proof, supposing we're gonna implement a more complicated compression job submitter that supports the failure model proposed in the above doc. |
CompressionJobSubmitter for async compression job submission and result retrieval.CompressionJobSubmitter for async compression-job submission and result retrieval.
…on-job submission and result retrieval. (y-scope#1736)
Description
As the title suggests. This PR adds
CompressionJobSubmitterfor compression job submission. It creates an async coroutine to submit and wait until the job completes. The submission and results will be logged in log-ingestor's logger.As a result, we also update
IngestorJobManagerto create buffers usingCompressionJobSubmitter.Checklist
breaking change.
Validation performed
Summary by CodeRabbit
New Features
Refactor
Chores
✏️ Tip: You can customize this high-level summary in your review settings.