fix: correctly propagate checkpoint dependencies across DAG expansions#3864
fix: correctly propagate checkpoint dependencies across DAG expansions#3864jackerschott wants to merge 22 commits into
Conversation
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (2)
📝 WalkthroughWalkthroughReworks checkpoint dependency propagation in src/snakemake/dag.py to an iterative convergence loop that detects completed checkpoint outputs on disk, rescans affected consumers via self.depending, re-evaluates and replaces jobs until no new checkpoint_target inputs appear, and adds a regression test for second-order checkpoint behavior. (≤50 words) ChangesCheckpoint dependency convergence
Sequence Diagram(s)sequenceDiagram
participant DAG as DAG
participant CP as CheckpointJob
participant FS as Filesystem
participant Consumer as DownstreamRule
Note over DAG: update_checkpoint_dependencies (iterative convergence)
DAG->>CP: identify candidate completed checkpoint jobs (jobs? skips FS checks if provided)
CP->>FS: check outputs on disk / finished status
FS-->>CP: existence/status
CP-->>DAG: report completed checkpoint outputs
DAG->>DAG: merge evicted outputs into checkpoints.created_output
DAG->>DAG: compute affected consumers via self.depending
DAG->>Consumer: re-evaluate input functions (await job.updated())
Consumer-->>DAG: job changes / checkpoint_target inputs
DAG->>DAG: await replace_job(...) for changed jobs
alt new checkpoint_target inputs detected and rounds < 100
DAG->>DAG: repeat convergence round (rescan)
else no changes or cap reached
DAG-->>DAG: finish convergence or raise WorkflowError
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (3)
src/snakemake/dag.py (3)
2152-2157:jobs is Nonecondition appears unreachable.The condition
jobs is Noneon line 2156 can never beTruebecause this function is always called withjobs or self.jobs(lines 2178, 2206), which ensures the argument is neverNone. The comment mentions that "jobs (if not None) are finished jobs", but this distinction is lost due to theorpattern.If the intent is to skip the
is_output_presentcheck when the caller passes specific finished jobs, consider passing a separate flag or restructuring the call pattern.
2215-2217: Repeatedmap()call inside loop is inefficient.
map(lambda job: job.rule, consumers_prior)creates a new iterator for each consumer inconsumers_posterior. Consider precomputing a set before the loop.+ rules_prior = {job.rule for job in consumers_prior} for consumer in consumers_posterior: - if consumer.rule not in map(lambda job: job.rule, consumers_prior): + if consumer.rule not in rules_prior: maybe_has_incomplete_consumers = True
2190-2223: Consider adding a maximum iteration safeguard.The
while Trueloop relies onmaybe_has_incomplete_consumerseventually becomingFalseto terminate. While this should converge in normal cases, complex checkpoint dependencies or edge cases could potentially cause excessive iterations.Consider adding a maximum iteration limit with a warning, similar to other Snakemake safeguards:
MAX_CHECKPOINT_UPDATE_ROUNDS = 100 # or configurable i = 1 while True: if i > MAX_CHECKPOINT_UPDATE_ROUNDS: logger.warning(f"Checkpoint update did not converge after {MAX_CHECKPOINT_UPDATE_ROUNDS} rounds") break ...
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/snakemake/dag.py(4 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py
⚙️ CodeRabbit configuration file
**/*.py: Do not try to improve formatting.
Do not suggest type annotations for functions that are defined inside of functions or methods.
Do not suggest type annotation of theselfargument of methods.
Do not suggest type annotation of theclsargument of classmethods.
Do not suggest return type annotation if a function or method does not contain areturnstatement.
Files:
src/snakemake/dag.py
🧠 Learnings (4)
📓 Common learnings
Learnt from: kdm9
Repo: snakemake/snakemake PR: 3562
File: src/snakemake/checkpoints.py:90-90
Timestamp: 2025-05-06T01:37:23.382Z
Learning: In Snakemake checkpoints implementation, tracking only the first missing output for each checkpoint is sufficient, because if one output is missing, all outputs for that checkpoint are considered incomplete. This was the behavior before PR #3562 and maintained in the pluralized `checkpoints.get()` implementation.
📚 Learning: 2024-10-14T09:42:11.571Z
Learnt from: johanneskoester
Repo: snakemake/snakemake PR: 3140
File: snakemake/dag.py:1308-1308
Timestamp: 2024-10-14T09:42:11.571Z
Learning: In `snakemake/dag.py`, avoid flagging missing lines or indentation issues when there is no clear syntax or logical error to prevent false positives.
Applied to files:
src/snakemake/dag.py
📚 Learning: 2024-11-12T12:08:20.342Z
Learnt from: johanneskoester
Repo: snakemake/snakemake PR: 3148
File: snakemake/dag.py:1332-1336
Timestamp: 2024-11-12T12:08:20.342Z
Learning: In `snakemake/dag.py`, when code is outdated and will disappear upon resolving merge conflicts, avoid making code review suggestions on that code.
Applied to files:
src/snakemake/dag.py
📚 Learning: 2024-10-06T14:09:54.370Z
Learnt from: johanneskoester
Repo: snakemake/snakemake PR: 3117
File: tests/test_wrapper/Snakefile:11-11
Timestamp: 2024-10-06T14:09:54.370Z
Learning: Changes made within test cases, such as in `tests/test_wrapper/Snakefile`, are for testing purposes and do not require updates to the project documentation.
Applied to files:
src/snakemake/dag.py
🧬 Code graph analysis (1)
src/snakemake/dag.py (1)
src/snakemake/checkpoints.py (2)
created_output(21-22)created_output(41-42)
447d289 to
fc4b200
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
src/snakemake/dag.py (1)
2225-2227: Consider optimizing repeated map creation.The
map()object is recreated for each consumer in the loop, which is inefficient. Consider creating a set of prior rules once before the loop.Apply this diff to optimize:
+ prior_rules = {job.rule for job in consumers_prior} for consumer in consumers_posterior: - if consumer.rule not in map(lambda job: job.rule, consumers_prior): + if consumer.rule not in prior_rules: maybe_has_incomplete_consumers = True
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/snakemake/dag.py(4 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py
⚙️ CodeRabbit configuration file
**/*.py: Do not try to improve formatting.
Do not suggest type annotations for functions that are defined inside of functions or methods.
Do not suggest type annotation of theselfargument of methods.
Do not suggest type annotation of theclsargument of classmethods.
Do not suggest return type annotation if a function or method does not contain areturnstatement.
Files:
src/snakemake/dag.py
🧠 Learnings (5)
📓 Common learnings
Learnt from: kdm9
Repo: snakemake/snakemake PR: 3562
File: src/snakemake/checkpoints.py:90-90
Timestamp: 2025-05-06T01:37:23.382Z
Learning: In Snakemake checkpoints implementation, tracking only the first missing output for each checkpoint is sufficient, because if one output is missing, all outputs for that checkpoint are considered incomplete. This was the behavior before PR #3562 and maintained in the pluralized `checkpoints.get()` implementation.
📚 Learning: 2024-10-14T09:42:11.571Z
Learnt from: johanneskoester
Repo: snakemake/snakemake PR: 3140
File: snakemake/dag.py:1308-1308
Timestamp: 2024-10-14T09:42:11.571Z
Learning: In `snakemake/dag.py`, avoid flagging missing lines or indentation issues when there is no clear syntax or logical error to prevent false positives.
Applied to files:
src/snakemake/dag.py
📚 Learning: 2024-11-12T12:08:20.342Z
Learnt from: johanneskoester
Repo: snakemake/snakemake PR: 3148
File: snakemake/dag.py:1332-1336
Timestamp: 2024-11-12T12:08:20.342Z
Learning: In `snakemake/dag.py`, when code is outdated and will disappear upon resolving merge conflicts, avoid making code review suggestions on that code.
Applied to files:
src/snakemake/dag.py
📚 Learning: 2024-10-06T14:09:54.370Z
Learnt from: johanneskoester
Repo: snakemake/snakemake PR: 3117
File: tests/test_wrapper/Snakefile:11-11
Timestamp: 2024-10-06T14:09:54.370Z
Learning: Changes made within test cases, such as in `tests/test_wrapper/Snakefile`, are for testing purposes and do not require updates to the project documentation.
Applied to files:
src/snakemake/dag.py
📚 Learning: 2024-10-04T16:12:18.927Z
Learnt from: lczech
Repo: snakemake/snakemake PR: 3113
File: snakemake/scheduler.py:912-914
Timestamp: 2024-10-04T16:12:18.927Z
Learning: In `snakemake/scheduler.py`, avoid suggesting the use of `asyncio.gather` in the `jobs_rewards` method due to overhead concerns and the need for immediate results.
Applied to files:
src/snakemake/dag.py
🧬 Code graph analysis (1)
src/snakemake/dag.py (1)
src/snakemake/checkpoints.py (2)
created_output(21-22)created_output(41-42)
|
Hm, it looks like I get a different problem with this fix in a real setting: Edit: I tried to downgrade to version 9.5.1 (with my fix) which I was previously using and that fixed this second issue (now it runs smoothly). |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
src/snakemake/dag.py (1)
2228-2230: Inefficient membership test with map().Using
map()inside theinoperator creates a new map object on each iteration and performs a linear search through it. This is inefficient, especially with many consumers.Precompute the set of prior rules outside the loop:
+ prior_consumer_rules = {job.rule for job in consumers_prior} for consumer in consumers_posterior: - if consumer.rule not in map(lambda job: job.rule, consumers_prior): + if consumer.rule not in prior_consumer_rules: maybe_has_incomplete_consumers = True
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/snakemake/dag.py(4 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py
⚙️ CodeRabbit configuration file
**/*.py: Do not try to improve formatting.
Do not suggest type annotations for functions that are defined inside of functions or methods.
Do not suggest type annotation of theselfargument of methods.
Do not suggest type annotation of theclsargument of classmethods.
Do not suggest return type annotation if a function or method does not contain areturnstatement.
Files:
src/snakemake/dag.py
🧠 Learnings (7)
📓 Common learnings
Learnt from: kdm9
Repo: snakemake/snakemake PR: 3562
File: src/snakemake/checkpoints.py:90-90
Timestamp: 2025-05-06T01:37:23.382Z
Learning: In Snakemake checkpoints implementation, tracking only the first missing output for each checkpoint is sufficient, because if one output is missing, all outputs for that checkpoint are considered incomplete. This was the behavior before PR #3562 and maintained in the pluralized `checkpoints.get()` implementation.
📚 Learning: 2024-10-14T09:42:11.571Z
Learnt from: johanneskoester
Repo: snakemake/snakemake PR: 3140
File: snakemake/dag.py:1308-1308
Timestamp: 2024-10-14T09:42:11.571Z
Learning: In `snakemake/dag.py`, avoid flagging missing lines or indentation issues when there is no clear syntax or logical error to prevent false positives.
Applied to files:
src/snakemake/dag.py
📚 Learning: 2024-11-12T12:08:20.342Z
Learnt from: johanneskoester
Repo: snakemake/snakemake PR: 3148
File: snakemake/dag.py:1332-1336
Timestamp: 2024-11-12T12:08:20.342Z
Learning: In `snakemake/dag.py`, when code is outdated and will disappear upon resolving merge conflicts, avoid making code review suggestions on that code.
Applied to files:
src/snakemake/dag.py
📚 Learning: 2024-10-06T14:09:54.370Z
Learnt from: johanneskoester
Repo: snakemake/snakemake PR: 3117
File: tests/test_wrapper/Snakefile:11-11
Timestamp: 2024-10-06T14:09:54.370Z
Learning: Changes made within test cases, such as in `tests/test_wrapper/Snakefile`, are for testing purposes and do not require updates to the project documentation.
Applied to files:
src/snakemake/dag.py
📚 Learning: 2024-10-04T16:12:18.927Z
Learnt from: lczech
Repo: snakemake/snakemake PR: 3113
File: snakemake/scheduler.py:912-914
Timestamp: 2024-10-04T16:12:18.927Z
Learning: In `snakemake/scheduler.py`, avoid suggesting the use of `asyncio.gather` in the `jobs_rewards` method due to overhead concerns and the need for immediate results.
Applied to files:
src/snakemake/dag.py
📚 Learning: 2025-05-23T09:40:24.474Z
Learnt from: johanneskoester
Repo: snakemake/snakemake PR: 3600
File: src/snakemake/jobs.py:960-964
Timestamp: 2025-05-23T09:40:24.474Z
Learning: In the `cleanup` method of the `Job` class in `src/snakemake/jobs.py`, files in the `to_remove` list should be formatted with `fmt_iofile` without specifying `as_output=True` or `as_input=True` parameters, as these files should be displayed as generic files rather than specifically as output files.
Applied to files:
src/snakemake/dag.py
📚 Learning: 2025-05-06T01:37:23.382Z
Learnt from: kdm9
Repo: snakemake/snakemake PR: 3562
File: src/snakemake/checkpoints.py:90-90
Timestamp: 2025-05-06T01:37:23.382Z
Learning: In Snakemake checkpoints implementation, tracking only the first missing output for each checkpoint is sufficient, because if one output is missing, all outputs for that checkpoint are considered incomplete. This was the behavior before PR #3562 and maintained in the pluralized `checkpoints.get()` implementation.
Applied to files:
src/snakemake/dag.py
🧬 Code graph analysis (1)
src/snakemake/dag.py (2)
src/snakemake/jobs.py (6)
jobs(1409-1410)jobs(1413-1414)output(346-347)output(350-351)output(1580-1586)updated(377-395)src/snakemake/checkpoints.py (2)
created_output(21-22)created_output(41-42)
🔇 Additional comments (1)
src/snakemake/dag.py (1)
2207-2213: Verify checkpoint target change detection logic.The code only flags
maybe_has_incomplete_consumerswhen checkpoint targets are REMOVED (target_inputs_prior.difference(target_inputs_posterior)), but not when they are ADDED.For "second order" checkpoint scenarios mentioned in the PR objectives, shouldn't ANY change to checkpoint targets (additions or removals) indicate potential incomplete consumers?
Consider whether the condition should detect any changes:
- if len(target_inputs_prior.difference(target_inputs_posterior)) > 0: + if target_inputs_prior != target_inputs_posterior: maybe_has_incomplete_consumers = TruePlease verify whether the current logic (only detecting removals) is intentional and correct for the checkpoint dependency update semantics.
johanneskoester
left a comment
There was a problem hiding this comment.
Fantastic, thanks a lot!
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
src/snakemake/dag.py (1)
2213-2214: Consider checking for both added and removed target inputs.The current check only detects when
checkpoint_targetinputs are removed (resolved), but doesn't detect when new ones are added. While new consumer rules appearing (line 2230) might cover this indirectly, an explicit check would be more robust:if target_inputs_prior != target_inputs_posterior: maybe_has_incomplete_consumers = TrueThis would catch both checkpoint resolution (removal) and new checkpoint dependencies (addition).
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/snakemake/dag.py(4 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py
⚙️ CodeRabbit configuration file
**/*.py: Do not try to improve formatting.
Do not suggest type annotations for functions that are defined inside of functions or methods.
Do not suggest type annotation of theselfargument of methods.
Do not suggest type annotation of theclsargument of classmethods.
Do not suggest return type annotation if a function or method does not contain areturnstatement.
Files:
src/snakemake/dag.py
🧠 Learnings (6)
📓 Common learnings
Learnt from: kdm9
Repo: snakemake/snakemake PR: 3562
File: src/snakemake/checkpoints.py:90-90
Timestamp: 2025-05-06T01:37:23.382Z
Learning: In Snakemake checkpoints implementation, tracking only the first missing output for each checkpoint is sufficient, because if one output is missing, all outputs for that checkpoint are considered incomplete. This was the behavior before PR #3562 and maintained in the pluralized `checkpoints.get()` implementation.
Learnt from: johanneskoester
Repo: snakemake/snakemake PR: 3148
File: snakemake/dag.py:1332-1336
Timestamp: 2024-11-12T12:08:20.342Z
Learning: In `snakemake/dag.py`, when code is outdated and will disappear upon resolving merge conflicts, avoid making code review suggestions on that code.
Learnt from: johanneskoester
Repo: snakemake/snakemake PR: 3140
File: snakemake/dag.py:1308-1308
Timestamp: 2024-10-14T09:42:11.571Z
Learning: In `snakemake/dag.py`, avoid flagging missing lines or indentation issues when there is no clear syntax or logical error to prevent false positives.
📚 Learning: 2024-10-14T09:42:11.571Z
Learnt from: johanneskoester
Repo: snakemake/snakemake PR: 3140
File: snakemake/dag.py:1308-1308
Timestamp: 2024-10-14T09:42:11.571Z
Learning: In `snakemake/dag.py`, avoid flagging missing lines or indentation issues when there is no clear syntax or logical error to prevent false positives.
Applied to files:
src/snakemake/dag.py
📚 Learning: 2024-11-12T12:08:20.342Z
Learnt from: johanneskoester
Repo: snakemake/snakemake PR: 3148
File: snakemake/dag.py:1332-1336
Timestamp: 2024-11-12T12:08:20.342Z
Learning: In `snakemake/dag.py`, when code is outdated and will disappear upon resolving merge conflicts, avoid making code review suggestions on that code.
Applied to files:
src/snakemake/dag.py
📚 Learning: 2024-10-04T16:12:18.927Z
Learnt from: lczech
Repo: snakemake/snakemake PR: 3113
File: snakemake/scheduler.py:912-914
Timestamp: 2024-10-04T16:12:18.927Z
Learning: In `snakemake/scheduler.py`, avoid suggesting the use of `asyncio.gather` in the `jobs_rewards` method due to overhead concerns and the need for immediate results.
Applied to files:
src/snakemake/dag.py
📚 Learning: 2025-05-23T09:40:24.474Z
Learnt from: johanneskoester
Repo: snakemake/snakemake PR: 3600
File: src/snakemake/jobs.py:960-964
Timestamp: 2025-05-23T09:40:24.474Z
Learning: In the `cleanup` method of the `Job` class in `src/snakemake/jobs.py`, files in the `to_remove` list should be formatted with `fmt_iofile` without specifying `as_output=True` or `as_input=True` parameters, as these files should be displayed as generic files rather than specifically as output files.
Applied to files:
src/snakemake/dag.py
📚 Learning: 2025-05-06T01:37:23.382Z
Learnt from: kdm9
Repo: snakemake/snakemake PR: 3562
File: src/snakemake/checkpoints.py:90-90
Timestamp: 2025-05-06T01:37:23.382Z
Learning: In Snakemake checkpoints implementation, tracking only the first missing output for each checkpoint is sufficient, because if one output is missing, all outputs for that checkpoint are considered incomplete. This was the behavior before PR #3562 and maintained in the pluralized `checkpoints.get()` implementation.
Applied to files:
src/snakemake/dag.py
🧬 Code graph analysis (1)
src/snakemake/dag.py (1)
src/snakemake/checkpoints.py (2)
created_output(21-22)created_output(41-42)
🔇 Additional comments (5)
src/snakemake/dag.py (5)
394-396: LGTM! Assertion formatting improved.The change from tuple-style to proper assertion with separate message argument is cleaner and more idiomatic.
2157-2198: Helper functions are well-structured.The extracted helper functions (
compute_completed_checkpoint_jobs,compute_checkpoint_consumers,flag_checkpoints_as_completed,get_checkpoint_target_inputs) improve readability and modularity. The early return when no consumers exist is an appropriate optimization.
2753-2762: LGTM! Formatting improvement.The refactored string construction with explicit
textwrap.dedent()andformat()is clearer and maintains the same output.
3068-3068: LGTM! Log message consolidated.The single-line string is cleaner and maintains the same message.
2154-2247: Verify whether iterative checkpoint dependency updates trigger job scheduling operations in mismatched process contexts.The AssertionError in v9.14 with SLURM scheduler is related to changes in PR #3850 for filesystem storage performance. The iterative checkpoint update logic here performs up to 100 rounds of
replace_job()andupdate_needrun()calls. While these operations occur within the async checkpoint update method during job completion callbacks, each iteration could mark jobs as ready or modify DAG state in ways that trigger downstream scheduler operations. If those operations attempt to download storage inputs (the code path with theis_main_processassertion), a mismatch in process context between the callback and main scheduler flow could cause the assertion to fail. Verify that the async wrapper properly preserves process context through all iteration rounds.
johanneskoester
left a comment
There was a problem hiding this comment.
Just one final thing: could you add a testcase derived from the example in your issue #3862?
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Fix all issues with AI agents
In @src/snakemake/dag.py:
- Around line 2156-2250: In update_checkpoint_dependencies, fix three bugs:
invert the check_is_output_present flag passed to
compute_completed_checkpoint_jobs so it is True when jobs is None (i.e., only
check outputs when not given), improve convergence detection by comparing
target_inputs_prior != target_inputs_posterior (or use symmetric difference)
instead of only checking removed inputs and also compare consumer identity/rules
using sets rather than map+lambda for consumers_posterior vs consumers_prior,
and replace the hard exit(1) with raising a specific exception (or returning
False) and correct the loop-limit check to i >= MAX_CHECKPOINT_UPDATE_ROUNDS to
enforce exactly MAX_CHECKPOINT_UPDATE_ROUNDS iterations; refer to the
functions/variables update_checkpoint_dependencies,
compute_completed_checkpoint_jobs, check_is_output_present,
get_checkpoint_target_inputs, maybe_has_incomplete_consumers,
consumers_prior/consumers_posterior, MAX_CHECKPOINT_UPDATE_ROUNDS and the
exit(1) call to locate changes.
- Around line 2210-2216: The current convergence check only detects removed
inputs by using target_inputs_prior.difference(target_inputs_posterior) > 0;
change it to detect any change (additions or removals) between
target_inputs_prior and target_inputs_posterior — e.g., compare the two sets for
inequality (target_inputs_prior != target_inputs_posterior) or use
symmetric_difference — so that when consumer.updated() adds new checkpoint
inputs it also sets maybe_has_incomplete_consumers = True.
- Around line 2159-2170: The logic for deciding when to verify checkpoint
outputs is inverted: in compute_completed_checkpoint_jobs the local flag
check_is_output_present should be true only when inspecting all jobs (jobs is
None) so output existence is checked then, and false when a specific list of
finished jobs is provided (jobs is not None) to avoid unnecessary I/O. Change
the assignment of check_is_output_present from (jobs is not None) to (jobs is
None) so the condition (not check_is_output_present or await
is_output_present(job)) skips output checks for passed-in finished jobs and only
performs them when jobs is None.
- Around line 2243-2248: The loop termination check uses `if i >
MAX_CHECKPOINT_UPDATE_ROUNDS:` which allows one extra iteration; change it to
`if i >= MAX_CHECKPOINT_UPDATE_ROUNDS:` to enforce the intended limit, and
replace the direct `exit(1)` with raising an exception (e.g., raise
RuntimeError(...) or a Snakemake-specific exception) instead of calling `exit`,
so callers can catch/handle the failure; keep the `logger.error(...)` message
but raise with the same text or include the underlying context.
🧹 Nitpick comments (1)
src/snakemake/dag.py (1)
2231-2234: Inefficient map object creation inside loopLine 2232 creates a new
mapobject on each iteration to extract rules fromconsumers_prior, which is inefficient.♻️ Proposed refactor
+ prior_rules = set(job.rule for job in consumers_prior) for consumer in consumers_posterior: - if consumer.rule not in map(lambda job: job.rule, consumers_prior): + if consumer.rule not in prior_rules: maybe_has_incomplete_consumers = True
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/snakemake/dag.py
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py
⚙️ CodeRabbit configuration file
**/*.py: Do not try to improve formatting.
Do not suggest type annotations for functions that are defined inside of functions or methods.
Do not suggest type annotation of theselfargument of methods.
Do not suggest type annotation of theclsargument of classmethods.
Do not suggest return type annotation if a function or method does not contain areturnstatement.
Files:
src/snakemake/dag.py
🧠 Learnings (11)
📓 Common learnings
Learnt from: kdm9
Repo: snakemake/snakemake PR: 3562
File: src/snakemake/checkpoints.py:90-90
Timestamp: 2025-05-06T01:37:23.382Z
Learning: In Snakemake checkpoints implementation, tracking only the first missing output for each checkpoint is sufficient, because if one output is missing, all outputs for that checkpoint are considered incomplete. This was the behavior before PR #3562 and maintained in the pluralized `checkpoints.get()` implementation.
Learnt from: johanneskoester
Repo: snakemake/snakemake PR: 3148
File: snakemake/dag.py:1332-1336
Timestamp: 2024-11-12T12:08:20.342Z
Learning: In `snakemake/dag.py`, when code is outdated and will disappear upon resolving merge conflicts, avoid making code review suggestions on that code.
Learnt from: johanneskoester
Repo: snakemake/snakemake PR: 3140
File: snakemake/dag.py:1308-1308
Timestamp: 2024-10-14T09:42:11.571Z
Learning: In `snakemake/dag.py`, avoid flagging missing lines or indentation issues when there is no clear syntax or logical error to prevent false positives.
📚 Learning: 2024-10-14T09:42:11.571Z
Learnt from: johanneskoester
Repo: snakemake/snakemake PR: 3140
File: snakemake/dag.py:1308-1308
Timestamp: 2024-10-14T09:42:11.571Z
Learning: In `snakemake/dag.py`, avoid flagging missing lines or indentation issues when there is no clear syntax or logical error to prevent false positives.
Applied to files:
src/snakemake/dag.py
📚 Learning: 2024-11-12T12:08:20.342Z
Learnt from: johanneskoester
Repo: snakemake/snakemake PR: 3148
File: snakemake/dag.py:1332-1336
Timestamp: 2024-11-12T12:08:20.342Z
Learning: In `snakemake/dag.py`, when code is outdated and will disappear upon resolving merge conflicts, avoid making code review suggestions on that code.
Applied to files:
src/snakemake/dag.py
📚 Learning: 2024-10-06T14:09:54.370Z
Learnt from: johanneskoester
Repo: snakemake/snakemake PR: 3117
File: tests/test_wrapper/Snakefile:11-11
Timestamp: 2024-10-06T14:09:54.370Z
Learning: Changes made within test cases, such as in `tests/test_wrapper/Snakefile`, are for testing purposes and do not require updates to the project documentation.
Applied to files:
src/snakemake/dag.py
📚 Learning: 2024-10-04T16:12:18.927Z
Learnt from: lczech
Repo: snakemake/snakemake PR: 3113
File: snakemake/scheduler.py:912-914
Timestamp: 2024-10-04T16:12:18.927Z
Learning: In `snakemake/scheduler.py`, avoid suggesting the use of `asyncio.gather` in the `jobs_rewards` method due to overhead concerns and the need for immediate results.
Applied to files:
src/snakemake/dag.py
📚 Learning: 2025-05-23T09:40:24.474Z
Learnt from: johanneskoester
Repo: snakemake/snakemake PR: 3600
File: src/snakemake/jobs.py:960-964
Timestamp: 2025-05-23T09:40:24.474Z
Learning: In the `cleanup` method of the `Job` class in `src/snakemake/jobs.py`, files in the `to_remove` list should be formatted with `fmt_iofile` without specifying `as_output=True` or `as_input=True` parameters, as these files should be displayed as generic files rather than specifically as output files.
Applied to files:
src/snakemake/dag.py
📚 Learning: 2025-05-06T01:37:23.382Z
Learnt from: kdm9
Repo: snakemake/snakemake PR: 3562
File: src/snakemake/checkpoints.py:90-90
Timestamp: 2025-05-06T01:37:23.382Z
Learning: In Snakemake checkpoints implementation, tracking only the first missing output for each checkpoint is sufficient, because if one output is missing, all outputs for that checkpoint are considered incomplete. This was the behavior before PR #3562 and maintained in the pluralized `checkpoints.get()` implementation.
Applied to files:
src/snakemake/dag.py
📚 Learning: 2025-01-14T14:04:30.554Z
Learnt from: johanneskoester
Repo: snakemake/snakemake PR: 3107
File: snakemake/logging.py:510-515
Timestamp: 2025-01-14T14:04:30.554Z
Learning: In Snakemake's logging handlers, KeyboardInterrupt and SystemExit exceptions should be ignored (pass) rather than re-raised, as these are control flow signals that should not disrupt the logging process. The exception variable in the final except block should be kept for potential debugging purposes.
Applied to files:
src/snakemake/dag.py
📚 Learning: 2024-10-29T09:26:26.636Z
Learnt from: johanneskoester
Repo: snakemake/snakemake PR: 3171
File: snakemake/cli.py:106-145
Timestamp: 2024-10-29T09:26:26.636Z
Learning: In the `snakemake` codebase, exception chaining (using `raise ... from ...`) is avoided due to backward compatibility reasons. Do not suggest using exception chaining in `snakemake` codebase.
Applied to files:
src/snakemake/dag.py
📚 Learning: 2024-10-06T14:09:26.494Z
Learnt from: johanneskoester
Repo: snakemake/snakemake PR: 3117
File: snakemake/deployment/conda.py:0-0
Timestamp: 2024-10-06T14:09:26.494Z
Learning: In `snakemake/deployment/conda.py`, the exception handling in the `_get_version` method is intentional. The method raises a `WorkflowError` when the version cannot be determined, and this behavior is desired. Do not suggest modifying this exception handling.
Applied to files:
src/snakemake/dag.py
📚 Learning: 2026-01-08T11:13:20.523Z
Learnt from: johanneskoester
Repo: snakemake/snakemake PR: 3643
File: src/snakemake/sourcecache.py:474-492
Timestamp: 2026-01-08T11:13:20.523Z
Learning: In Python files under src/snakemake, ensure that the per-run cache_path for source files does not change within a single workflow run, so that HostingProviderFile._hosted_repos can be keyed reliably by repository name. This invariant should be documented and enforced (e.g., by freezing cache_path after initialization, avoiding mutations during a run, and ensuring any caching uses a run-scoped key). This pattern applies to all modules in the Snakemake source cache area where per-run caching and repository hosting are implemented.
Applied to files:
src/snakemake/dag.py
🧬 Code graph analysis (1)
src/snakemake/dag.py (1)
src/snakemake/checkpoints.py (2)
created_output(21-22)created_output(41-42)
🔇 Additional comments (3)
src/snakemake/dag.py (3)
394-396: LGTM: Assertion formattingThe assertion reformatting is clean and preserves the original logic.
2755-2764: LGTM: Mermaid.js formatting adjustmentThe reformatting of the return statement is clean and produces identical output.
3070-3070: LGTM: Archive log message formattingThe single-line string is cleaner and maintains the same message content.
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (2)
src/snakemake/dag.py (2)
2215-2216:⚠️ Potential issue | 🟠 MajorIncomplete convergence detection: only checks removed inputs.
The condition
target_inputs_prior.difference(target_inputs_posterior) > 0only detects when checkpoint target inputs are removed. It does not detect when inputs are added, which could also indicate incomplete checkpoint resolution requiring another iteration.🐛 Proposed fix
- if len(target_inputs_prior.difference(target_inputs_posterior)) > 0: + if target_inputs_prior != target_inputs_posterior: maybe_has_incomplete_consumers = TrueThis checks for any change (additions or removals) in checkpoint target inputs.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/snakemake/dag.py` around lines 2215 - 2216, The current convergence check only flags removed inputs by using target_inputs_prior.difference(target_inputs_posterior); change it to detect any change between target_inputs_prior and target_inputs_posterior (additions or removals) — e.g., use symmetric difference or compare the sets for inequality — so the maybe_has_incomplete_consumers flag is set whenever the sets differ (refer to variables target_inputs_prior, target_inputs_posterior and the maybe_has_incomplete_consumers assignment in the same block).
2243-2247:⚠️ Potential issue | 🟠 MajorReplace
exit(1)with exception for proper error handling.Using
exit(1)terminates the entire Python process, which is problematic when Snakemake is used as a library or in contexts where graceful error handling is expected. This is inconsistent with Snakemake's error handling patterns elsewhere in the codebase.🐛 Proposed fix
if i > MAX_CHECKPOINT_UPDATE_ROUNDS: - logger.error( - f"Checkpoint update did not converge after {MAX_CHECKPOINT_UPDATE_ROUNDS} rounds" - ) - exit(1) + raise WorkflowError( + f"Checkpoint dependency update did not converge after {MAX_CHECKPOINT_UPDATE_ROUNDS} rounds. " + "This may indicate a cyclic checkpoint dependency or a bug in checkpoint resolution." + )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/snakemake/dag.py` around lines 2243 - 2247, The code currently calls exit(1) when the checkpoint update loop exceeds MAX_CHECKPOINT_UPDATE_ROUNDS (the block in src/snakemake/dag.py checking if i > MAX_CHECKPOINT_UPDATE_ROUNDS); replace exit(1) with raising an exception (e.g., raise RuntimeError(f"Checkpoint update did not converge after {MAX_CHECKPOINT_UPDATE_ROUNDS} rounds")) so callers can catch and handle the error instead of forcibly terminating the process. Ensure the raised exception includes the same descriptive message and remove the exit(1) call.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/snakemake/dag.py`:
- Around line 2231-2233: The current membership check uses consumer.rule vs
map(lambda job: job.rule, consumers_prior) which only compares rule names and
can miss distinct jobs from the same rule; change the membership test to compare
job identity (e.g., job object or unique job id) instead of rule name: build a
set of consumers_prior identities (for example set(consumers_prior) or set(j.id
for j in consumers_prior)) and then test if the current consumer (or
consumer.id) is in that set before setting maybe_has_incomplete_consumers,
updating the check around consumers_posterior, consumers_prior, and
maybe_has_incomplete_consumers accordingly.
---
Duplicate comments:
In `@src/snakemake/dag.py`:
- Around line 2215-2216: The current convergence check only flags removed inputs
by using target_inputs_prior.difference(target_inputs_posterior); change it to
detect any change between target_inputs_prior and target_inputs_posterior
(additions or removals) — e.g., use symmetric difference or compare the sets for
inequality — so the maybe_has_incomplete_consumers flag is set whenever the sets
differ (refer to variables target_inputs_prior, target_inputs_posterior and the
maybe_has_incomplete_consumers assignment in the same block).
- Around line 2243-2247: The code currently calls exit(1) when the checkpoint
update loop exceeds MAX_CHECKPOINT_UPDATE_ROUNDS (the block in
src/snakemake/dag.py checking if i > MAX_CHECKPOINT_UPDATE_ROUNDS); replace
exit(1) with raising an exception (e.g., raise RuntimeError(f"Checkpoint update
did not converge after {MAX_CHECKPOINT_UPDATE_ROUNDS} rounds")) so callers can
catch and handle the error instead of forcibly terminating the process. Ensure
the raised exception includes the same descriptive message and remove the
exit(1) call.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/snakemake/dag.py`:
- Around line 2220-2261: The loop only seeds completed checkpoint discovery from
new job identities (new_jobs - seen_jobs), which misses cases where a
reevaluated consumer references an existing job's checkpoint or when update()
evicts a checkpoint output immediately; to fix, broaden the rescan seed each
iteration to include not only newly added jobs but also any jobs that now have
completed checkpoint outputs by re-folding self._evicted_checkpoint_outputs into
self.workflow.checkpoints.created_output (or otherwise tracking newly available
checkpoint outputs) inside the loop before calling
get_completed_checkpoint_jobs, and then call get_completed_checkpoint_jobs
against the union of relevant job ids (e.g., new_jobs and any jobs owning newly
created/evicted outputs) so flag_checkpoints_as_completed(),
get_checkpoint_consumers(), and subsequent replace_job/update_needrun rounds see
second-order checkpoints.
In `@tests/tests.py`:
- Around line 3113-3114: Replace the single invocation in test_checkpoints_2nd
with two consecutive runs against the same workdir so the second run exercises
reuse of persisted checkpoints: call run(dpath("test_checkpoints_2nd"),
check_results=False) once to create the outputs, then call
run(dpath("test_checkpoints_2nd"), check_results=False) a second time (same
workdir) to ensure the downstream DAG sees precomputed checkpoint artifacts;
keep the same function names (test_checkpoints_2nd, run) and the check_results
flag to control verification.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 69486be1-b089-4a52-95df-e7d8098194ab
📒 Files selected for processing (5)
src/snakemake/dag.pytests/test_checkpoints_2nd/Snakefiletests/test_checkpoints_2nd/alice.txttests/test_checkpoints_2nd/second.txttests/tests.py
✅ Files skipped from review due to trivial changes (1)
- tests/test_checkpoints_2nd/alice.txt
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
src/snakemake/dag.py (1)
2207-2210: Consider addingstrict=Truetozip()for defensive coding.Per static analysis hint, adding
strict=Trueensures the iteration fails fast ifcheckpoint_jobsandresultsever have mismatched lengths due to a bug, rather than silently dropping elements.♻️ Proposed fix
results = await asyncio.gather( *(is_output_new_present(job) for job in checkpoint_jobs) ) - return [job for job, done in zip(checkpoint_jobs, results) if done] + return [job for job, done in zip(checkpoint_jobs, results, strict=True) if done]🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/snakemake/dag.py` around lines 2207 - 2210, Update the comprehension that pairs checkpoint_jobs with results to use zip(..., strict=True) so a length mismatch raises immediately; specifically modify the expression in the return statement that currently does return [job for job, done in zip(checkpoint_jobs, results) if done] to use zip(checkpoint_jobs, results, strict=True), leaving the awaited asyncio.gather call and the is_output_new_present references unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/snakemake/dag.py`:
- Around line 2268-2271: The error message raised via WorkflowError after the
range(1, 101) loop reports an off-by-one count; change the formatted count from
{i-1} to {i} (or otherwise use the actual number of rounds executed) so the
message correctly states "100 rounds" when the loop completes, keeping the
existing raise and message text and referencing the range(1, 101) loop and the
WorkflowError raise site in src/snakemake/dag.py.
---
Nitpick comments:
In `@src/snakemake/dag.py`:
- Around line 2207-2210: Update the comprehension that pairs checkpoint_jobs
with results to use zip(..., strict=True) so a length mismatch raises
immediately; specifically modify the expression in the return statement that
currently does return [job for job, done in zip(checkpoint_jobs, results) if
done] to use zip(checkpoint_jobs, results, strict=True), leaving the awaited
asyncio.gather call and the is_output_new_present references unchanged.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 4130f647-dd81-4536-a6d3-1df655a2adb9
📒 Files selected for processing (4)
src/snakemake/dag.pytests/test_checkpoints_2nd/Snakefiletests/test_checkpoints_2nd/bob.txttests/test_checkpoints_2nd/expected-results/second.txt
✅ Files skipped from review due to trivial changes (1)
- tests/test_checkpoints_2nd/bob.txt
🚧 Files skipped from review as they are similar to previous changes (1)
- tests/test_checkpoints_2nd/Snakefile
There was a problem hiding this comment.
♻️ Duplicate comments (2)
src/snakemake/dag.py (2)
2264-2266:⚠️ Potential issue | 🟡 MinorThe non-convergence error underreports the retry count.
After
for i in range(1, 101)exhausts,iis100, so{i-1}reports 99 rounds instead of 100.🐛 Suggested fix
raise WorkflowError( - f"Checkpoint dependency update did not converge after {i-1} rounds. " + f"Checkpoint dependency update did not converge after {i} rounds. " "This may indicate a cycle in checkpoint dependencies." )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/snakemake/dag.py` around lines 2264 - 2266, The error message in the raise within src/snakemake/dag.py underreports the number of rounds because it uses {i-1} after the for i in range(1, 101) loop; change the f-string in the WorkflowError to report the actual attempted rounds (use i instead of i-1 or compute a rounds variable before raising) so the message correctly states 100 rounds when the loop exhausts; update the raise in that block (the WorkflowError construction) accordingly.
2233-2258:⚠️ Potential issue | 🟠 MajorRequeue consumers when a newly discovered checkpoint is immediately evicted.
_evicted_checkpoint_outputsis folded intocreated_outputonly before the firstaffected_jobsset is built. Ifreplace_job()introduces a checkpoint whose output already exists on disk,update()can evict that checkpoint in the same round and append its outputs to_evicted_checkpoint_outputs. At that point the consumer still carries a newcheckpoint_target, butaffected_jobsis rebuilt only from checkpoint jobs that remain inself.jobs, so the next iteration can exit without ever re-running that consumer against the newly known output.🔁 Suggested direction
for i in range(1, 101): logger.debug(f"Checkpoint dependency update round {i}") + next_affected_jobs = set() no_new_deps = True for affected_job in affected_jobs: updated_job = await affected_job.updated() - if no_new_deps and checkpoint_target_inputs_updated( - affected_job, updated_job - ): + if checkpoint_target_inputs_updated(affected_job, updated_job): no_new_deps = False + next_affected_jobs.add(updated_job) await self.replace_job(affected_job, updated_job, recursive=False) await self.update_needrun() + checkpoints_created_output.update(self._evicted_checkpoint_outputs) completed_checkpoint_jobs = await get_completed_checkpoint_jobs(self.jobs) flag_checkpoints_as_completed(completed_checkpoint_jobs) - affected_jobs = get_checkpoint_affected_jobs(completed_checkpoint_jobs) + affected_jobs = next_affected_jobs | get_checkpoint_affected_jobs( + completed_checkpoint_jobs + )You also need the same
_evicted_checkpoint_outputsfold before the initialif not affected_jobsfast path.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/snakemake/dag.py` around lines 2233 - 2258, Fold any outputs evicted during update into the created-output set both before the initial affected_jobs fast-path and after each rescan so consumers that gain a new checkpoint_target and are immediately evicted get re-evaluated; specifically, call checkpoints_created_output.update(self._evicted_checkpoint_outputs) immediately before computing affected_jobs (before the initial if not affected_jobs) and again right after completed_checkpoint_jobs = await get_completed_checkpoint_jobs(self.jobs) (inside the loop, before recomputing affected_jobs), so replace_job/updated evictions are visible when building get_checkpoint_affected_jobs.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@src/snakemake/dag.py`:
- Around line 2264-2266: The error message in the raise within
src/snakemake/dag.py underreports the number of rounds because it uses {i-1}
after the for i in range(1, 101) loop; change the f-string in the WorkflowError
to report the actual attempted rounds (use i instead of i-1 or compute a rounds
variable before raising) so the message correctly states 100 rounds when the
loop exhausts; update the raise in that block (the WorkflowError construction)
accordingly.
- Around line 2233-2258: Fold any outputs evicted during update into the
created-output set both before the initial affected_jobs fast-path and after
each rescan so consumers that gain a new checkpoint_target and are immediately
evicted get re-evaluated; specifically, call
checkpoints_created_output.update(self._evicted_checkpoint_outputs) immediately
before computing affected_jobs (before the initial if not affected_jobs) and
again right after completed_checkpoint_jobs = await
get_completed_checkpoint_jobs(self.jobs) (inside the loop, before recomputing
affected_jobs), so replace_job/updated evictions are visible when building
get_checkpoint_affected_jobs.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 47c813c2-a9f9-42a4-aa61-b4fe27d9dbff
📒 Files selected for processing (2)
src/snakemake/dag.pytests/tests.py
There was a problem hiding this comment.
🧹 Nitpick comments (1)
src/snakemake/dag.py (1)
2206-2209: Consider addingstrict=Truetozip()for defensive coding.While
checkpoint_jobsandresultsare guaranteed to have matching lengths by construction (sinceresultscomes fromasyncio.gatherover the samecheckpoint_jobs), addingstrict=Trueprovides an explicit safety net against future refactoring that might break this invariant.♻️ Suggested change
results = await asyncio.gather( *(is_output_new_present(job) for job in checkpoint_jobs) ) - return [job for job, done in zip(checkpoint_jobs, results) if done] + return [job for job, done in zip(checkpoint_jobs, results, strict=True) if done]🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/snakemake/dag.py` around lines 2206 - 2209, The zip between checkpoint_jobs and results should be made defensive: change the comprehension that currently does return [job for job, done in zip(checkpoint_jobs, results) if done] to use zip(checkpoint_jobs, results, strict=True) to ensure mismatched lengths raise immediately; update the call site where asyncio.gather(...) produces results (and the related is_output_new_present usage) so the returned pairwise filtering uses zip(..., strict=True).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@src/snakemake/dag.py`:
- Around line 2206-2209: The zip between checkpoint_jobs and results should be
made defensive: change the comprehension that currently does return [job for
job, done in zip(checkpoint_jobs, results) if done] to use zip(checkpoint_jobs,
results, strict=True) to ensure mismatched lengths raise immediately; update the
call site where asyncio.gather(...) produces results (and the related
is_output_new_present usage) so the returned pairwise filtering uses zip(...,
strict=True).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: abed4a94-17d4-41fe-a469-785c3e86f2a4
📒 Files selected for processing (1)
src/snakemake/dag.py
There was a problem hiding this comment.
♻️ Duplicate comments (1)
src/snakemake/dag.py (1)
2234-2243:⚠️ Potential issue | 🔴 CriticalEvicted checkpoint outputs still don't schedule a second pass.
When a checkpoint was evicted earlier (Lines 1361-1373), its job and dependency edges are already gone. Folding
_evicted_checkpoint_outputsintocreated_outputhere therefore does not produce anyaffected_jobs, so this path can hit Line 2242 and return with jobs that still carrycheckpoint_targetinputs unresolved. The same blind spot exists for evicted outputs discovered inside later rounds, because they are only folded once before the loop. Seed reevaluation from jobs withcheckpoint_targetinputs whenever evicted outputs are merged, and repeat that merge inside each round.🔧 Minimal shape of the missing seed
+ def get_jobs_with_checkpoint_targets(): + return { + job + for job in self.jobs + if any(is_flagged(f, "checkpoint_target") for f in job.input) + } + + def fold_evicted_checkpoint_outputs(): + evicted, self._evicted_checkpoint_outputs = ( + self._evicted_checkpoint_outputs, + set(), + ) + checkpoints_created.update(evicted) + return evicted + - evicted, self._evicted_checkpoint_outputs = self._evicted_checkpoint_outputs, set() - checkpoints_created.update(evicted) - affected_jobs = get_checkpoint_affected_jobs(completed_checkpoint_jobs) + evicted = fold_evicted_checkpoint_outputs() + affected_jobs = get_checkpoint_affected_jobs(completed_checkpoint_jobs) + if evicted: + affected_jobs |= get_jobs_with_checkpoint_targets() ... await self.update_needrun() + evicted = fold_evicted_checkpoint_outputs() completed_checkpoint_jobs = await get_completed_checkpoint_jobs(self.jobs) update_checkpoints_created(completed_checkpoint_jobs) affected_jobs = get_checkpoint_affected_jobs(completed_checkpoint_jobs) + if evicted: + affected_jobs |= get_jobs_with_checkpoint_targets()Also applies to: 2258-2262
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/snakemake/dag.py` around lines 2234 - 2243, The merge of _evicted_checkpoint_outputs into checkpoints_created happens only once, so if evicted outputs remove jobs earlier their dependent jobs with checkpoint_target inputs never get re-seeded; move the swap-and-merge logic so it runs whenever evicted outputs are discovered (i.e., perform evicted, self._evicted_checkpoint_outputs = self._evicted_checkpoint_outputs, set() and checkpoints_created.update(evicted) inside each DAG traversal round), and after each such merge call get_checkpoint_affected_jobs(completed_checkpoint_jobs or completed_checkpoint_jobs + evicted) to compute affected_jobs and enqueue them instead of returning early; ensure code paths that currently return False (when not affected) do not exit the round prematurely but continue until no new evicted outputs are produced.
🧹 Nitpick comments (1)
src/snakemake/dag.py (1)
132-138: Use a mutable type for the DAG adjacency tables.These fields are deleted and updated throughout
DAG(30+ mutation sites), so annotating them asMapping[...]makes the types lie about the implementation and weakens static checking. UseMutableMapping[...]or the concretedefaultdict[...]shape on the private attributes instead.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/snakemake/dag.py` around lines 132 - 138, The type annotations for the adjacency tables _dependencies and depending are wrong: they are mutated extensively but are declared as immutable Mapping[...] which weakens static checking; update their annotations to a mutable type (e.g., MutableMapping[Job, MutableMapping[Job, Set[str]]] or the concrete defaultdict shape) on the private attributes in class DAG so the type reflects mutation sites, and keep the runtime construction (defaultdict(partial(defaultdict, set))) unchanged; change only the type hints for _dependencies and depending to MutableMapping (or the explicit defaultdict nested mapping) to satisfy the type checker and callers.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@src/snakemake/dag.py`:
- Around line 2234-2243: The merge of _evicted_checkpoint_outputs into
checkpoints_created happens only once, so if evicted outputs remove jobs earlier
their dependent jobs with checkpoint_target inputs never get re-seeded; move the
swap-and-merge logic so it runs whenever evicted outputs are discovered (i.e.,
perform evicted, self._evicted_checkpoint_outputs =
self._evicted_checkpoint_outputs, set() and checkpoints_created.update(evicted)
inside each DAG traversal round), and after each such merge call
get_checkpoint_affected_jobs(completed_checkpoint_jobs or
completed_checkpoint_jobs + evicted) to compute affected_jobs and enqueue them
instead of returning early; ensure code paths that currently return False (when
not affected) do not exit the round prematurely but continue until no new
evicted outputs are produced.
---
Nitpick comments:
In `@src/snakemake/dag.py`:
- Around line 132-138: The type annotations for the adjacency tables
_dependencies and depending are wrong: they are mutated extensively but are
declared as immutable Mapping[...] which weakens static checking; update their
annotations to a mutable type (e.g., MutableMapping[Job, MutableMapping[Job,
Set[str]]] or the concrete defaultdict shape) on the private attributes in class
DAG so the type reflects mutation sites, and keep the runtime construction
(defaultdict(partial(defaultdict, set))) unchanged; change only the type hints
for _dependencies and depending to MutableMapping (or the explicit defaultdict
nested mapping) to satisfy the type checker and callers.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 012762f7-2a15-4a91-a6d9-246b57ffed18
📒 Files selected for processing (1)
src/snakemake/dag.py
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/snakemake/dag.py`:
- Around line 2218-2244: The code only builds affected_jobs from
completed_checkpoint_jobs and thus misses consumers that become satisfiable
because we merged _evicted_checkpoint_outputs into checkpoints_created; after
update_checkpoints_created(completed_checkpoint_jobs) add a pass that finds jobs
in self.jobs whose checkpoint_target inputs were previously unsatisfied but are
now satisfied by the newly-added evicted outputs (use
checkpoint_target_inputs_updated semantics: test whether a job's input set gains
checkpoint_target entries from the merged evicted set / checkpoints_created),
and include those jobs in affected_jobs (or extend the list returned by
get_checkpoint_affected_jobs) so consumers of evicted checkpoint outputs are
requeued for the next round; reference update_checkpoints_created,
_evicted_checkpoint_outputs, checkpoints_created,
checkpoint_target_inputs_updated, get_checkpoint_affected_jobs, and
completed_checkpoint_jobs when making the change.
- Around line 2205-2208: The zip over checkpoint_jobs and results can silently
truncate if the sequences ever differ; update the comprehension that returns
matched jobs (the list comprehension using zip(checkpoint_jobs, results)) to
call zip(checkpoint_jobs, results, strict=True) so the runtime will raise on
length mismatch and document the invariant between checkpoint_jobs and the
results produced by asyncio.gather calling is_output_new_present.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 24c02c41-3978-42ea-8644-3917bf78c186
📒 Files selected for processing (1)
src/snakemake/dag.py
|
@Hocnonsense can you have a look at the conflicts, because I think they are now with your merged PR from before. Hopefully you can distill the best of both PRs in the conflict resolution? |
|
Sure, I'll sort out the conflicts, and I'll let you know once it's done. @johanneskoester it's done now! |
Fixes #3862
QC
Since this is a bug fix, both points should be covered.
docs/) is updated to reflect the changes or this is not necessary (e.g. if the change does neither modify the language nor the behavior or functionalities of Snakemake).Summary by CodeRabbit
Refactor
Bug Fixes
Tests
Style