Skip to content

fix(clp-package): Add dataset to metadata database after input paths are processed for compression jobs (fixes #2091).#2092

Merged
quinntaylormitchell merged 4 commits into
y-scope:mainfrom
quinntaylormitchell:compression-dataset-addition
Mar 25, 2026
Merged

fix(clp-package): Add dataset to metadata database after input paths are processed for compression jobs (fixes #2091).#2092
quinntaylormitchell merged 4 commits into
y-scope:mainfrom
quinntaylormitchell:compression-dataset-addition

Conversation

@quinntaylormitchell

@quinntaylormitchell quinntaylormitchell commented Mar 12, 2026

Copy link
Copy Markdown
Collaborator

Description

This PR addresses issue #2091 by calling _ensure_dataset_exists() after input paths are processed for a compression job, not before.

Note: My only concern with this implementation is that this fix only protects against path-processing failures. Datasets will still be added to the metadata database if the compression job fails in the core. I tested the idea of calling _ensure_dataset_exists() from _complete_compression_job() instead, and while that did strictly fix the issue (in the sense that failed jobs were no longer adding their datasets to the metadata database), it also broke all compression, because compression jobs need the dataset to exist in the metadata database before the compression job starts.

Checklist

  • The PR satisfies the contribution guidelines.
  • This is a breaking change and that has been indicated in the PR title, OR this isn't a
    breaking change.
  • Necessary docs have been updated, OR no docs need to be updated.

Validation performed

Ran the replication steps described in issue #2091; datasets are only added to the metadata database if the paths in the compression command are valid.

Summary by CodeRabbit

  • Refactor
    • Dataset validation in the compression scheduler now occurs per job after input paths are enumerated and buffered, and is performed only when the CLP_S storage engine is active, improving per-job validation timing.

@quinntaylormitchell quinntaylormitchell requested a review from a team as a code owner March 12, 2026 16:48
@coderabbitai

coderabbitai Bot commented Mar 12, 2026

Copy link
Copy Markdown
Contributor

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 90a11c6d-425c-4625-897a-9b644de9e7b9

📥 Commits

Reviewing files that changed from the base of the PR and between 5cd037f and 361d3bb.

📒 Files selected for processing (1)
  • components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py

Walkthrough

Dataset existence validation in search_and_schedule_new_tasks was moved from the function start to after per-job input path processing. The pre-fetch of existing_datasets was removed; when StorageEngine.CLP_S is active the code now recomputes the job's table_prefix/dataset and calls _ensure_dataset_exists(...) after paths_to_compress_buffer.flush() for each job.

Changes

Cohort / File(s) Summary
Compression Scheduler Logic
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
Removed upfront per-function dataset initialization and call to _ensure_dataset_exists(...). Moved dataset existence check into the per-job loop after paths_to_compress_buffer.flush(), gated by StorageEngine.CLP_S, recomputing table_prefix and dataset from the current job before calling _ensure_dataset_exists(...) and then _batch_and_submit_tasks(...).

Sequence Diagram(s)

sequenceDiagram
    participant Scheduler as Scheduler
    participant Buffer as PathsBuffer
    participant CLP as StorageEngine/CLP_S
    participant DatasetSvc as DatasetService
    participant Submitter as TaskSubmitter

    Scheduler->>Buffer: enumerate & buffer input paths
    Buffer->>Buffer: flush()
    alt StorageEngine == CLP_S
        Scheduler->>CLP: read job.clp_io_config.input.dataset
        CLP->>DatasetSvc: _ensure_dataset_exists(dataset)
    end
    Scheduler->>Submitter: _batch_and_submit_tasks(...)
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Possibly related issues

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the main change: deferring dataset addition to the metadata database until after input paths are processed, and it directly references the issue being fixed.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 Ruff (0.15.6)
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py

Unexpected Ruff issue shape at index 18


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In
`@components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py`:
- Around line 394-406: The code fetches the full dataset set inside each CLP_S
job block causing repeated DB reads; move the call to fetch_existing_datasets
out of the per-job branch so a single cached existing_datasets set is
initialized lazily once (before looping jobs when StorageEngine.CLP_S is
possible) and then reused; keep using _ensure_dataset_exists(clp_config,
db_context, table_prefix, dataset, existing_datasets) to update the cache in
place for cache misses, and retain table_prefix and dataset usage as-is
(symbols: StorageEngine.CLP_S, fetch_existing_datasets, existing_datasets,
_ensure_dataset_exists, clp_metadata_db_connection_config, clp_config,
db_context).

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: f46f1ad1-87cb-443a-b737-8afa0e30ff77

📥 Commits

Reviewing files that changed from the base of the PR and between cef42bb and 5cd037f.

📒 Files selected for processing (1)
  • components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py

@junhaoliao junhaoliao left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the rest lgtm

Comment on lines +395 to +397
existing_datasets: set[str] = fetch_existing_datasets(
db_context.cursor, clp_metadata_db_connection_config["table_prefix"]
)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i agree with the rabbit's comment at https://github.com/y-scope/clp/pull/2092/changes#r2966540337

while not too expensive, fetch_existing_datasets() reads from the DB and should be avoided in the loop.

how about adding this before the for-loop instead

existing_datasets: set[str] = set()
if StorageEngine.CLP_S == clp_config.package.storage_engine:
    existing_datasets = fetch_existing_datasets(...)

i acknowledge there could be risk if the user deletes a dataset after we call fetch_existing_datasets() and this for job_row in jobs: run for long, but that's more of an issue in the dataset manager's implementation (we should invalidate / empty a dataset table than actually removing it to avoid such issues) than the scheduler's

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure that's a good point. I will restore the section before the for-loop.

@junhaoliao junhaoliao left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

title lgtm too

@quinntaylormitchell quinntaylormitchell merged commit 1b44f17 into y-scope:main Mar 25, 2026
22 checks passed
@junhaoliao junhaoliao added this to the March 2026 milestone Apr 24, 2026
junhaoliao added a commit to junhaoliao/clp that referenced this pull request May 17, 2026
…are processed for compression jobs (fixes y-scope#2091). (y-scope#2092)

Co-authored-by: Junhao Liao <junhao.liao@yscope.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants