Skip to content

feat(log-ingestor): Add CompressionJobSubmitter for async compression-job submission and result retrieval.#1736

Merged
LinZhihao-723 merged 26 commits into
y-scope:mainfrom
LinZhihao-723:compression-job-submitter
Dec 7, 2025
Merged

feat(log-ingestor): Add CompressionJobSubmitter for async compression-job submission and result retrieval.#1736
LinZhihao-723 merged 26 commits into
y-scope:mainfrom
LinZhihao-723:compression-job-submitter

Conversation

@LinZhihao-723

@LinZhihao-723 LinZhihao-723 commented Dec 6, 2025

Copy link
Copy Markdown
Member

Description

As the title suggests. This PR adds CompressionJobSubmitter for compression job submission. It creates an async coroutine to submit and wait until the job completes. The submission and results will be logged in log-ingestor's logger.

As a result, we also update IngestorJobManager to create buffers using CompressionJobSubmitter.

Checklist

  • The PR satisfies the contribution guidelines.
  • This is a breaking change and that has been indicated in the PR title, OR this isn't a
    breaking change.
  • Necessary docs have been updated, OR no docs need to be updated.

Validation performed

  • Ensure all workflows passed.
  • Tested with a hardcoded ingestor job manager to ensure the logs can be successfully captured from S3 by the ingestion job and submitted to CLP for compression.

Summary by CodeRabbit

  • New Features

    • Added end-to-end compression job submission and monitoring using a database-backed background workflow.
    • Listener/ingestion flow now integrates with CLP configuration and credential-based archive output.
  • Refactor

    • Reworked ingestion manager to centralize CLP config, credentials and job submission wiring.
  • Chores

    • Added dependencies to support database access and compile-time constants.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai

coderabbitai Bot commented Dec 6, 2025

Copy link
Copy Markdown
Contributor

Walkthrough

Adds a new CLP compression job submitter that enqueues and monitors jobs via MySQL, exports a new default dataset constant, introduces sqlx and const_format dependencies, and refactors the ingestion job manager to provide AWS credentials and a MySQL pool for submission.

Changes

Cohort / File(s) Summary
CLP Rust Utils Configuration
components/clp-rust-utils/src/clp_config/package.rs, components/clp-rust-utils/src/clp_config/package/config.rs
Added pub const DEFAULT_DATASET_NAME: &str = "default"; removed a blank line in the tests module (formatting-only).
Log Ingestor Dependencies
components/log-ingestor/Cargo.toml
Added dependencies: const_format = "0.2.35" and sqlx = { version = "0.8.6", features = ["runtime-tokio", "mysql"] }.
Compression Job Submission
components/log-ingestor/src/compression.rs, components/log-ingestor/src/compression/compression_job_submitter.rs
Added compression_job_submitter module and public re-export; new CompressionJobSubmitter struct with constructor and async background logic to serialize IO config, insert compression job rows into MySQL, and poll status with exponential backoff.
Ingestion Job Manager Integration
components/log-ingestor/src/ingestion_job_manager.rs
Added from_config async constructor accepting CLP config/credentials, added mysql_pool and archive_output_config fields, extracted AWS credentials from S3 input, and replaced the placeholder submitter with the new CompressionJobSubmitter integration.

Sequence Diagram

sequenceDiagram
    participant Listener as Log Buffer Listener
    participant Submitter as CompressionJobSubmitter
    participant DB as MySQL Database
    participant Monitor as Background Poll Task

    Listener->>Submitter: submit(buffer)
    activate Submitter
    Submitter->>Submitter: Populate IO config with buffer keys
    Submitter->>Monitor: Spawn background task
    deactivate Submitter

    activate Monitor
    Monitor->>DB: Insert serialized IO config -> compression_jobs
    activate DB
    DB-->>Monitor: return job_id
    deactivate DB

    loop Poll with exponential backoff (<=30s)
        Monitor->>DB: Query job status by job_id
        activate DB
        DB-->>Monitor: status (Pending/Success/Failure/Unknown)
        deactivate DB

        alt Success
            Monitor->>Monitor: Log completion and exit
        else Failure
            Monitor->>Monitor: Log failure and exit
        else Pending
            Monitor->>Monitor: Wait/backoff and continue
        end
    end
    deactivate Monitor
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

  • Review focus:
    • components/log-ingestor/src/compression/compression_job_submitter.rs — serialization, DB insertion/query correctness, error handling, and polling/backoff logic.
    • components/log-ingestor/src/ingestion_job_manager.rs — credential extraction, MySQL pool creation/lifecycle, and wiring of the new submitter into listener creation.
    • components/log-ingestor/Cargo.toml — ensure sqlx features and runtime selection align with project async runtime and build setup.

Pre-merge checks and finishing touches

✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title directly and accurately reflects the main change: adding CompressionJobSubmitter for async compression-job submission and result retrieval.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

📜 Recent review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between fefa892 and 1480f13.

📒 Files selected for processing (1)
  • components/log-ingestor/src/compression/compression_job_submitter.rs (1 hunks)
🧰 Additional context used
🧠 Learnings (9)
📓 Common learnings
Learnt from: Bill-hbrhbr
Repo: y-scope/clp PR: 1122
File: components/core/src/clp/clp/CMakeLists.txt:175-195
Timestamp: 2025-07-23T09:54:45.185Z
Learning: In the CLP project, when reviewing CMakeLists.txt changes that introduce new compression library dependencies (BZip2, LibLZMA, LZ4, ZLIB), the team prefers to address conditional linking improvements in separate PRs rather than expanding the scope of focused migration PRs like the LibArchive task-based installation migration.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 1340
File: components/job-orchestration/job_orchestration/executor/compress/compression_task.py:528-528
Timestamp: 2025-09-28T15:00:22.170Z
Learning: In components/job-orchestration/job_orchestration/executor/compress/compression_task.py, there is a suggestion to refactor from passing logger as a parameter through multiple functions to creating a ClpCompressor class that takes the logger as a class member, with current helper functions becoming private member functions.
Learnt from: junhaoliao
Repo: y-scope/clp PR: 1152
File: components/clp-package-utils/clp_package_utils/scripts/start_clp.py:613-613
Timestamp: 2025-08-08T06:59:42.436Z
Learning: In components/clp-package-utils/clp_package_utils/scripts/start_clp.py, generic_start_scheduler sets CLP_LOGGING_LEVEL using clp_config.query_scheduler.logging_level for both schedulers; compression scheduler should use its own logging level. Tracking via an issue created from PR #1152 discussion.
Learnt from: quinntaylormitchell
Repo: y-scope/clp PR: 1125
File: components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py:267-291
Timestamp: 2025-09-15T22:20:40.750Z
Learning: For CLP compression jobs, the team has decided to fail the entire job immediately upon encountering any invalid input path, rather than continuing to process valid paths. This decision was made during PR #1125 development.
Learnt from: junhaoliao
Repo: y-scope/clp PR: 1178
File: components/clp-package-utils/clp_package_utils/controller.py:217-223
Timestamp: 2025-09-25T05:13:13.298Z
Learning: The compression scheduler service in CLP runs with CLP_UID_GID (current user's UID:GID) rather than CLP_SERVICE_CONTAINER_UID_GID (999:999), unlike infrastructure services such as database, queue, redis, and results cache which run with the service container UID:GID.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-08T15:52:50.753Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-01T07:59:11.208Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
📚 Learning: 2025-09-28T15:00:22.170Z
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 1340
File: components/job-orchestration/job_orchestration/executor/compress/compression_task.py:528-528
Timestamp: 2025-09-28T15:00:22.170Z
Learning: In components/job-orchestration/job_orchestration/executor/compress/compression_task.py, there is a suggestion to refactor from passing logger as a parameter through multiple functions to creating a ClpCompressor class that takes the logger as a class member, with current helper functions becoming private member functions.

Applied to files:

  • components/log-ingestor/src/compression/compression_job_submitter.rs
📚 Learning: 2025-07-23T09:54:45.185Z
Learnt from: Bill-hbrhbr
Repo: y-scope/clp PR: 1122
File: components/core/src/clp/clp/CMakeLists.txt:175-195
Timestamp: 2025-07-23T09:54:45.185Z
Learning: In the CLP project, when reviewing CMakeLists.txt changes that introduce new compression library dependencies (BZip2, LibLZMA, LZ4, ZLIB), the team prefers to address conditional linking improvements in separate PRs rather than expanding the scope of focused migration PRs like the LibArchive task-based installation migration.

Applied to files:

  • components/log-ingestor/src/compression/compression_job_submitter.rs
📚 Learning: 2025-09-19T18:28:26.747Z
Learnt from: gibber9809
Repo: y-scope/clp PR: 1169
File: components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py:463-469
Timestamp: 2025-09-19T18:28:26.747Z
Learning: In the compression scheduler (components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py), the SIGTERM handling uses a coarse-grained approach that only checks received_sigterm before calling search_and_schedule_new_tasks. The maintainers consider the race condition where SIGTERM arrives mid-execution to be benign, as scheduling a few extra tasks during shutdown is acceptable and will be handled by cleanup mechanisms.

Applied to files:

  • components/log-ingestor/src/compression/compression_job_submitter.rs
📚 Learning: 2025-08-08T06:59:42.436Z
Learnt from: junhaoliao
Repo: y-scope/clp PR: 1152
File: components/clp-package-utils/clp_package_utils/scripts/start_clp.py:613-613
Timestamp: 2025-08-08T06:59:42.436Z
Learning: In components/clp-package-utils/clp_package_utils/scripts/start_clp.py, generic_start_scheduler sets CLP_LOGGING_LEVEL using clp_config.query_scheduler.logging_level for both schedulers; compression scheduler should use its own logging level. Tracking via an issue created from PR #1152 discussion.

Applied to files:

  • components/log-ingestor/src/compression/compression_job_submitter.rs
📚 Learning: 2025-09-15T22:20:40.750Z
Learnt from: quinntaylormitchell
Repo: y-scope/clp PR: 1125
File: components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py:267-291
Timestamp: 2025-09-15T22:20:40.750Z
Learning: For CLP compression jobs, the team has decided to fail the entire job immediately upon encountering any invalid input path, rather than continuing to process valid paths. This decision was made during PR #1125 development.

Applied to files:

  • components/log-ingestor/src/compression/compression_job_submitter.rs
📚 Learning: 2025-01-16T16:58:43.190Z
Learnt from: haiqi96
Repo: y-scope/clp PR: 651
File: components/clp-package-utils/clp_package_utils/scripts/compress.py:0-0
Timestamp: 2025-01-16T16:58:43.190Z
Learning: In the clp-package compression flow, path validation and error handling is performed at the scheduler level rather than in the compress.py script to maintain simplicity and avoid code duplication.

Applied to files:

  • components/log-ingestor/src/compression/compression_job_submitter.rs
📚 Learning: 2025-08-09T04:07:27.083Z
Learnt from: Bill-hbrhbr
Repo: y-scope/clp PR: 1156
File: components/core/CMakeLists.txt:772-772
Timestamp: 2025-08-09T04:07:27.083Z
Learning: In the CLP project's CMakeLists.txt, when reviewing changes related to the ${zstd_TARGET} variable usage in test linking, the team is planning a refactoring PR to improve this mechanism. Guards for undefined target variables should be deferred to that separate PR rather than being added in focused dependency migration PRs.

Applied to files:

  • components/log-ingestor/src/compression/compression_job_submitter.rs
📚 Learning: 2025-08-25T06:29:59.610Z
Learnt from: Bill-hbrhbr
Repo: y-scope/clp PR: 1261
File: .github/workflows/clp-core-build.yaml:294-332
Timestamp: 2025-08-25T06:29:59.610Z
Learning: In the CLP project, Bill-hbrhbr prefers a "fail fast" approach for CI workflows - allowing potential command availability issues (like getconf in musllinux) to surface through CI failures rather than preemptively adding fallback logic, as they will fix issues when they occur.

Applied to files:

  • components/log-ingestor/src/compression/compression_job_submitter.rs
🧬 Code graph analysis (1)
components/log-ingestor/src/compression/compression_job_submitter.rs (2)
components/clp-py-utils/clp_py_utils/clp_config.py (3)
  • AwsAuthentication (547-577)
  • S3Config (580-584)
  • ArchiveOutput (693-706)
components/job-orchestration/job_orchestration/scheduler/job_config.py (2)
  • OutputConfig (47-53)
  • S3InputConfig (32-44)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: build (ubuntu-24.04)
  • GitHub Check: build (macos-15)
  • GitHub Check: package-image
  • GitHub Check: rust-checks
🔇 Additional comments (4)
components/log-ingestor/src/compression/compression_job_submitter.rs (4)

1-26: LGTM: Imports and constants are well-organized.

The imports are appropriate for the async database operations and CLP integration. The table name constant follows best practices for avoiding magic strings in SQL queries.


34-48: LGTM: Clean async trait implementation.

The submit method correctly clones the template config, populates it with buffer keys, and spawns the submission task. The fire-and-forget spawn pattern has been acknowledged as intentional in previous reviews.


50-103: LGTM: Constructor properly initializes the submitter.

The factory function correctly builds the S3 input and output configurations from the provided parameters. The DEFAULT_DATASET_NAME fallback (lines 75-79) is documented as a workaround for issue #1735, which appropriately tracks this technical debt.


105-188: LGTM: Job submission and polling logic is sound.

The helper function correctly:

  • Serializes the IO config using BrotliMsgpack
  • Executes a parameterized INSERT query (SQL-injection safe)
  • Implements exponential backoff (1s → 30s cap)
  • Handles all status transitions (Failed/Succeeded explicitly; Pending/Running implicitly via _ pattern)
  • Logs all errors without propagating them (as documented)

The absence of a polling timeout and detached execution have been acknowledged as intentional design decisions in previous reviews.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@LinZhihao-723 LinZhihao-723 marked this pull request as ready for review December 6, 2025 04:27
@LinZhihao-723 LinZhihao-723 requested a review from a team as a code owner December 6, 2025 04:27

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between bfc474f and fefa892.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (6)
  • components/clp-rust-utils/src/clp_config/package.rs (1 hunks)
  • components/clp-rust-utils/src/clp_config/package/config.rs (0 hunks)
  • components/log-ingestor/Cargo.toml (1 hunks)
  • components/log-ingestor/src/compression.rs (1 hunks)
  • components/log-ingestor/src/compression/compression_job_submitter.rs (1 hunks)
  • components/log-ingestor/src/ingestion_job_manager.rs (4 hunks)
💤 Files with no reviewable changes (1)
  • components/clp-rust-utils/src/clp_config/package/config.rs
🧰 Additional context used
🧠 Learnings (5)
📓 Common learnings
Learnt from: Bill-hbrhbr
Repo: y-scope/clp PR: 1122
File: components/core/src/clp/clp/CMakeLists.txt:175-195
Timestamp: 2025-07-23T09:54:45.185Z
Learning: In the CLP project, when reviewing CMakeLists.txt changes that introduce new compression library dependencies (BZip2, LibLZMA, LZ4, ZLIB), the team prefers to address conditional linking improvements in separate PRs rather than expanding the scope of focused migration PRs like the LibArchive task-based installation migration.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 1340
File: components/job-orchestration/job_orchestration/executor/compress/compression_task.py:528-528
Timestamp: 2025-09-28T15:00:22.170Z
Learning: In components/job-orchestration/job_orchestration/executor/compress/compression_task.py, there is a suggestion to refactor from passing logger as a parameter through multiple functions to creating a ClpCompressor class that takes the logger as a class member, with current helper functions becoming private member functions.
Learnt from: junhaoliao
Repo: y-scope/clp PR: 1152
File: components/clp-package-utils/clp_package_utils/scripts/start_clp.py:613-613
Timestamp: 2025-08-08T06:59:42.436Z
Learning: In components/clp-package-utils/clp_package_utils/scripts/start_clp.py, generic_start_scheduler sets CLP_LOGGING_LEVEL using clp_config.query_scheduler.logging_level for both schedulers; compression scheduler should use its own logging level. Tracking via an issue created from PR #1152 discussion.
Learnt from: quinntaylormitchell
Repo: y-scope/clp PR: 1125
File: components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py:267-291
Timestamp: 2025-09-15T22:20:40.750Z
Learning: For CLP compression jobs, the team has decided to fail the entire job immediately upon encountering any invalid input path, rather than continuing to process valid paths. This decision was made during PR #1125 development.
Learnt from: junhaoliao
Repo: y-scope/clp PR: 1178
File: components/clp-package-utils/clp_package_utils/controller.py:217-223
Timestamp: 2025-09-25T05:13:13.298Z
Learning: The compression scheduler service in CLP runs with CLP_UID_GID (current user's UID:GID) rather than CLP_SERVICE_CONTAINER_UID_GID (999:999), unlike infrastructure services such as database, queue, redis, and results cache which run with the service container UID:GID.
Learnt from: haiqi96
Repo: y-scope/clp PR: 651
File: components/clp-package-utils/clp_package_utils/scripts/compress.py:0-0
Timestamp: 2025-01-16T16:58:43.190Z
Learning: In the clp-package compression flow, path validation and error handling is performed at the scheduler level rather than in the compress.py script to maintain simplicity and avoid code duplication.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-08T15:52:50.753Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 549
File: components/core/tests/test-ir_encoding_methods.cpp:1180-1186
Timestamp: 2024-10-01T07:59:11.208Z
Learning: In the context of loop constructs, LinZhihao-723 prefers using `while (true)` loops and does not consider alternative loop constructs necessarily more readable.
📚 Learning: 2025-11-03T16:17:40.223Z
Learnt from: hoophalab
Repo: y-scope/clp PR: 1535
File: components/clp-rust-utils/src/clp_config/package/config.rs:47-61
Timestamp: 2025-11-03T16:17:40.223Z
Learning: In the y-scope/clp repository, the `ApiServer` struct in `components/clp-rust-utils/src/clp_config/package/config.rs` is a Rust-native configuration type and does not mirror any Python code, unlike other structs in the same file (Config, Database, ResultsCache, Package) which are mirrors of Python definitions.

Applied to files:

  • components/clp-rust-utils/src/clp_config/package.rs
📚 Learning: 2025-09-28T15:00:22.170Z
Learnt from: LinZhihao-723
Repo: y-scope/clp PR: 1340
File: components/job-orchestration/job_orchestration/executor/compress/compression_task.py:528-528
Timestamp: 2025-09-28T15:00:22.170Z
Learning: In components/job-orchestration/job_orchestration/executor/compress/compression_task.py, there is a suggestion to refactor from passing logger as a parameter through multiple functions to creating a ClpCompressor class that takes the logger as a class member, with current helper functions becoming private member functions.

Applied to files:

  • components/log-ingestor/src/compression/compression_job_submitter.rs
📚 Learning: 2025-10-22T21:02:31.113Z
Learnt from: junhaoliao
Repo: y-scope/clp PR: 0
File: :0-0
Timestamp: 2025-10-22T21:02:31.113Z
Learning: Repository y-scope/clp: Maintain deterministic CI/builds for Rust; add a check to verify Cargo.lock is in sync with Cargo.toml without updating dependencies (non-mutating verification in clp-rust-checks workflow).

Applied to files:

  • components/log-ingestor/Cargo.toml
📚 Learning: 2025-10-22T21:01:31.391Z
Learnt from: junhaoliao
Repo: y-scope/clp PR: 0
File: :0-0
Timestamp: 2025-10-22T21:01:31.391Z
Learning: In y-scope/clp, for the clp-rust-checks workflow (GitHub Actions) and the Taskfile target deps:lock:check-rust, we should verify Rust Cargo.lock is in sync with Cargo.toml using a non-mutating method (e.g., cargo metadata --locked / cargo check --locked) to keep CI deterministic and avoid updating dependencies during validation.

Applied to files:

  • components/log-ingestor/Cargo.toml
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
  • GitHub Check: package-image
  • GitHub Check: lint-check (macos-15)
  • GitHub Check: lint-check (ubuntu-24.04)
  • GitHub Check: build (macos-15)
  • GitHub Check: build (ubuntu-24.04)
  • GitHub Check: rust-checks
🔇 Additional comments (8)
components/clp-rust-utils/src/clp_config/package.rs (1)

7-9: LGTM!

The constant is appropriately named and documented as a mirror of the Python configuration. It's correctly used in the compression job submitter for the dataset fallback.

components/log-ingestor/src/compression.rs (1)

2-6: LGTM!

The module declaration and re-export follow the existing pattern in this file, maintaining consistency with buffer and listener modules.

components/log-ingestor/src/compression/compression_job_submitter.rs (2)

49-102: LGTM!

The constructor properly builds the IO config template from the provided configurations. The #[must_use] attribute and fallback handling for the dataset (lines 74-79) are appropriate.


156-177: LGTM!

The status handling logic correctly maps numeric status codes to the typed enum and handles all terminal states (Failed, Succeeded) while allowing intermediate states to continue polling.

components/log-ingestor/src/ingestion_job_manager.rs (3)

38-46: LGTM!

The new fields archive_output_config and mysql_pool are appropriately added to support CLP integration. The struct remains cohesive with related configuration grouped together.


266-278: LGTM!

The updated create_listener method cleanly integrates the new CompressionJobSubmitter with all required dependencies. The wiring between the MySQL pool, AWS credentials, archive config, and ingestion job config is properly handled.


68-78: Verify that AwsAuthentication enum doesn't have other variants besides Credentials.

The nested match on line 69-71 assumes AwsAuthentication::Credentials is the only variant. If the AwsAuthentication enum has additional variants (e.g., IAM role-based authentication), this match will fail to compile, forcing proper handling of new authentication types. Confirm this is the intended behavior for the codebase.

components/log-ingestor/Cargo.toml (1)

14-17: Dependency versions are already current. Both const_format = "0.2.35" and sqlx = { version = "0.8.6", ... } are the latest stable versions available on crates.io as of December 2025. The features specified for sqlx are appropriate for the async Tokio runtime and MySQL support.

Likely an incorrect or invalid review comment.

Comment thread components/log-ingestor/src/ingestion_job_manager.rs
hoophalab
hoophalab previously approved these changes Dec 7, 2025

@hoophalab hoophalab left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm. One high-level question: Do you want to make BufferSubmitter::submit blocking rather than fire and forget? return void rather than Result<()>? Otherwise, it might be better to return void rather than Result<()> in both Buffer::submit and BufferSubmitter::submit.

One typo in the title:

feat(log-ingestor): Add `CompressionJobSubmitter `for async compression job submission and result retrieval.

io_config_template: ClpIoConfig,
}

#[async_trait::async_trait]

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#[async_trait::async_trait]
#[async_trait]

import async trait to keep consistent with other files?

@LinZhihao-723

Copy link
Copy Markdown
Member Author

lgtm. One high-level question: Do you want to make BufferSubmitter::submit blocking rather than fire and forget? return void rather than Result<()>? Otherwise, it might be better to return void rather than Result<()> in both Buffer::submit and BufferSubmitter::submit.

One typo in the title:

feat(log-ingestor): Add `CompressionJobSubmitter `for async compression job submission and result retrieval.

We don't want to make it blocking, as the compression job may take a long time to finish. In the high-level design, we will eventually implement a proper compression task that:

  • Directly submits the compression job to Spider
  • Wait until the job completes
  • Report any errors to CLP DB

And such a compression task is recoverable from a failure.

The current implementation of compression job submitter is a simplified version of this model, while:

  • Instead of directly submitting a compression job to Spider, we submit it to CLP DB and waiting for the package side scheduler to pick it up
  • Errors and progress are reflected to log-ingestor log file directly. No DB operations for persistency for now

You can find the entire flow in this doc.

@LinZhihao-723 LinZhihao-723 changed the title feat(log-ingestor): Add CompressionJobSubmitter for asyn compression job submission and result retrieval. feat(log-ingestor): Add CompressionJobSubmitter for async compression job submission and result retrieval. Dec 7, 2025
@LinZhihao-723

Copy link
Copy Markdown
Member Author

lgtm. One high-level question: Do you want to make BufferSubmitter::submit blocking rather than fire and forget? return void rather than Result<()>? Otherwise, it might be better to return void rather than Result<()> in both Buffer::submit and BufferSubmitter::submit.

One typo in the title:

feat(log-ingestor): Add `CompressionJobSubmitter `for async compression job submission and result retrieval.

And in terms of return type, I think making it a Result is more future proof, supposing we're gonna implement a more complicated compression job submitter that supports the failure model proposed in the above doc.

@LinZhihao-723 LinZhihao-723 changed the title feat(log-ingestor): Add CompressionJobSubmitter for async compression job submission and result retrieval. feat(log-ingestor): Add CompressionJobSubmitter for async compression-job submission and result retrieval. Dec 7, 2025
@LinZhihao-723 LinZhihao-723 merged commit 6ee5dcf into y-scope:main Dec 7, 2025
22 checks passed
junhaoliao pushed a commit to junhaoliao/clp that referenced this pull request May 17, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants