fix(compression_scheduler): Only fetch datasets when using the clp-s storage engine (fixes #1214).#1215
Conversation
…storage engine (fixes y-scope#1214).
WalkthroughRefactors search_and_schedule_new_tasks to accept CLPConfig as the first parameter. Replaces clp_archive_output usage with clp_config.archive_output. Adds conditional loading of existing_datasets based on storage engine. Updates internal call sites (e.g., main) to pass clp_config. Docstring revised accordingly. Changes
Sequence Diagram(s)sequenceDiagram
participant Main
participant Scheduler as compression_scheduler.search_and_schedule_new_tasks
participant Storage
Main->>Scheduler: search_and_schedule_new_tasks(clp_config, db_conn, db_cursor, metadata_db_cfg)
alt clp_config.package.storage_engine == CLP_S
Scheduler->>Storage: fetch existing_datasets
Storage-->>Scheduler: existing_datasets
else
Scheduler->>Scheduler: existing_datasets = []
end
Scheduler->>Scheduler: add_dataset(..., archive_output=clp_config.archive_output)
Scheduler-->>Main: return
Estimated code review effort🎯 2 (Simple) | ⏱️ ~10 minutes Tip 🔌 Remote MCP (Model Context Protocol) integration is now available!Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats. ✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
clp-s storage engine (fixes #1214).
There was a problem hiding this comment.
Actionable comments posted: 1
🔭 Outside diff range comments (2)
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (2)
171-200: Also guard dataset creation under non-CLP_S engines to avoid unintended metadata operationsCurrently, add_dataset may still run when the storage engine is not CLP_S, because existing_datasets is empty and the condition checks only set membership. If non-CLP_S engines don’t support datasets in the same metadata DB, this can fail or create unintended rows. To fully align with the PR goal, gate dataset creation on CLP_S as well.
Apply this diff:
- if dataset is not None and dataset not in existing_datasets: - add_dataset( - db_conn, - db_cursor, - table_prefix, - dataset, - clp_config.archive_output, - ) - - # NOTE: This assumes we never delete a dataset when compression jobs are being scheduled - existing_datasets.add(dataset) + # Only manage datasets when using the CLP_S storage engine + if StorageEngine.CLP_S == clp_config.package.storage_engine: + if dataset is not None and dataset not in existing_datasets: + add_dataset( + db_conn, + db_cursor, + table_prefix, + dataset, + clp_config.archive_output, + ) + # NOTE: This assumes we never delete a dataset when compression jobs are being scheduled + existing_datasets.add(dataset)
404-406: Non-blocking: consider using clp_config.compression_scheduler.logging_level instead of envThis keeps the scheduler’s logging level sourced from its own config and avoids cross-scheduler leakage noted in prior discussions.
Apply this minimal change:
- set_logging_level(logger, os.getenv("CLP_LOGGING_LEVEL")) + set_logging_level(logger, clp_config.compression_scheduler.logging_level)Context: There’s a known issue where the start script sets CLP_LOGGING_LEVEL from the query scheduler. Sourcing directly from clp_config avoids that coupling.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py(3 hunks)
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: junhaoliao
PR: y-scope/clp#1152
File: components/clp-package-utils/clp_package_utils/scripts/start_clp.py:613-613
Timestamp: 2025-08-08T06:59:42.436Z
Learning: In components/clp-package-utils/clp_package_utils/scripts/start_clp.py, generic_start_scheduler sets CLP_LOGGING_LEVEL using clp_config.query_scheduler.logging_level for both schedulers; compression scheduler should use its own logging level. Tracking via an issue created from PR #1152 discussion.
Learnt from: Bill-hbrhbr
PR: y-scope/clp#1122
File: components/core/src/clp/clp/CMakeLists.txt:175-195
Timestamp: 2025-07-23T09:54:45.185Z
Learning: In the CLP project, when reviewing CMakeLists.txt changes that introduce new compression library dependencies (BZip2, LibLZMA, LZ4, ZLIB), the team prefers to address conditional linking improvements in separate PRs rather than expanding the scope of focused migration PRs like the LibArchive task-based installation migration.
Learnt from: haiqi96
PR: y-scope/clp#651
File: components/clp-package-utils/clp_package_utils/scripts/compress.py:0-0
Timestamp: 2025-01-16T16:58:43.190Z
Learning: In the clp-package compression flow, path validation and error handling is performed at the scheduler level rather than in the compress.py script to maintain simplicity and avoid code duplication.
📚 Learning: 2025-08-08T06:59:42.436Z
Learnt from: junhaoliao
PR: y-scope/clp#1152
File: components/clp-package-utils/clp_package_utils/scripts/start_clp.py:613-613
Timestamp: 2025-08-08T06:59:42.436Z
Learning: In components/clp-package-utils/clp_package_utils/scripts/start_clp.py, generic_start_scheduler sets CLP_LOGGING_LEVEL using clp_config.query_scheduler.logging_level for both schedulers; compression scheduler should use its own logging level. Tracking via an issue created from PR #1152 discussion.
Applied to files:
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
📚 Learning: 2025-01-16T16:58:43.190Z
Learnt from: haiqi96
PR: y-scope/clp#651
File: components/clp-package-utils/clp_package_utils/scripts/compress.py:0-0
Timestamp: 2025-01-16T16:58:43.190Z
Learning: In the clp-package compression flow, path validation and error handling is performed at the scheduler level rather than in the compress.py script to maintain simplicity and avoid code duplication.
Applied to files:
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
🔇 Additional comments (5)
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (5)
171-176: Conditional dataset fetch aligns with PR objectiveFetching existing datasets only when StorageEngine.CLP_S is in use is correct and makes the scheduler agnostic to other storage engines’ metadata layout.
198-199: Using clp_config.archive_output is the right consolidationThis removes the need to thread ArchiveOutput separately and keeps the function self-contained.
432-437: Call site updated to new signature (good)main correctly passes clp_config as the first argument to search_and_schedule_new_tasks.
156-161: Refactor Confirmed: all call sites updated
The search_and_schedule_new_tasks function now consistently uses the new signature(clp_config, db_conn, db_cursor, clp_metadata_db_connection_config).
No occurrences ofclp_archive_outputor old call patterns remain—ready to approve.
273-287: Incorrect storage‐engine assumption for tags table
The tags table is created bycreate_metadata_db_tablesonly whenstorage_engine != StorageEngine.CLP_S(i.e. for the default CLP engine), not under CLP_S. The proposed guard (StorageEngine.CLP_S == …) is inverted. If you need to skip tag operations when the table doesn’t exist, guard onStorageEngine.CLP(or equivalently!= CLP_S) instead.Likely an incorrect or invalid review comment.
| """ | ||
| For all jobs with PENDING status, splits the job into tasks and schedules them. | ||
| :param clp_config: | ||
| :param db_conn: | ||
| :param db_cursor: | ||
| :param clp_metadata_db_connection_config: | ||
| :param clp_archive_output: | ||
| """ |
There was a problem hiding this comment.
🧹 Nitpick (assertive)
Docstring is minimal; clarify clp_config usage and storage-engine–specific dataset handling
Please expand the docstring to reflect that datasets are fetched/managed only for the CLP_S storage engine and that archive_output is sourced from clp_config.
Apply this diff to improve the docstring:
def search_and_schedule_new_tasks(
clp_config: CLPConfig,
db_conn,
db_cursor,
clp_metadata_db_connection_config: Dict[str, Any],
):
"""
For all jobs with PENDING status, splits the job into tasks and schedules them.
- :param clp_config:
+ :param clp_config: Full CLP configuration. Note: dataset discovery/metadata updates are
+ performed only when clp_config.package.storage_engine == StorageEngine.CLP_S.
+ archive_output is taken from clp_config.archive_output.
:param db_conn:
:param db_cursor:
:param clp_metadata_db_connection_config:
"""📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| """ | |
| For all jobs with PENDING status, splits the job into tasks and schedules them. | |
| :param clp_config: | |
| :param db_conn: | |
| :param db_cursor: | |
| :param clp_metadata_db_connection_config: | |
| :param clp_archive_output: | |
| """ | |
| def search_and_schedule_new_tasks( | |
| clp_config: CLPConfig, | |
| db_conn, | |
| db_cursor, | |
| clp_metadata_db_connection_config: Dict[str, Any], | |
| ): | |
| """ | |
| For all jobs with PENDING status, splits the job into tasks and schedules them. | |
| :param clp_config: Full CLP configuration. Note: dataset discovery/metadata updates are | |
| performed only when clp_config.package.storage_engine == StorageEngine.CLP_S. | |
| archive_output is taken from clp_config.archive_output. | |
| :param db_conn: | |
| :param db_cursor: | |
| :param clp_metadata_db_connection_config: | |
| """ |
🤖 Prompt for AI Agents
In
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
around lines 162 to 168, the function docstring is too minimal; update it to
state that clp_config supplies archive_output and other relevant settings, and
that dataset fetching/management only occurs when the storage engine is CLP_S
(handle other storage engines differently or skip dataset operations). Modify
the docstring to list parameters (clp_config, db_conn, db_cursor,
clp_metadata_db_connection_config), describe how clp_config.archive_output is
used, and explicitly note storage-engine–specific behavior for dataset handling
so callers understand when datasets are fetched and when they are not.
…` storage engine (fixes y-scope#1214). (y-scope#1215) Co-authored-by: Bingran Hu <bingran.hu@yscope.com>
Description
As the title says. The PR implements the solution proposed in #1214.
Checklist
breaking change.
Validation performed
With the clp-text package:
With the clp-json package:
Summary by CodeRabbit