-
Notifications
You must be signed in to change notification settings - Fork 634
Description
Hi!
I have a rather complicated workflow that produces lots of files with unpredictable names (mainly timestamps).
There also is a need to create output filenames based on the content of input files.
Checkpoints should be able to provide support for this, but trying to implement it, I stumbled upon some inconsistent behavior regarding checkpoints that I think qualifies as a bug.
Snakemake version
8.27.1
Describe the bug
If I try to get outputs from a checkpoint multiple times in an input function with different wildcard values, the checkpoint seems to only be scheduled for execution once. This then causes FileNotFoundErrors down the line where the input function expects a checkpoint output to be present, but it is not as apparently, the checkpoint was only scheduled for execution the first time its outputs were accessed via get().
The good news is that by manually raising an IncompleteCheckpointException with the appropriate rule and target files in the input function fixes the issue! See the example below.
Logs
Minimal example
Run the bug with snakemake all_bug and the hacky-fixed one with snakemake all_working.
# Main entrypoint of the workflow.
# Please follow the best practices:
# https://snakemake.readthedocs.io/en/stable/snakefiles/best_practices.html,
# in particular regarding the standardized folder structure mentioned there.
import random
from snakemake.exceptions import IncompleteCheckpointException
names = ["one", "two"] # only using a single entry here works fine!
def input_bug(wildcards):
outputs = [Path(checkpoints.cp_one.get(name=p).output[0]) for p in names]
inputs = [f"content~{o.read_text()}_filename~{p}.txt" for o, p in zip(outputs, names)]
return inputs
rule all_bug:
input: input_bug
# Trying to fix with manually raising the exception
# This works!
def input_working(wildcards):
outputs = [Path(checkpoints.cp_one.get(name=p).output[0]) for p in names]
try:
inputs = [f"content~{o.read_text()}_filename~{p}.txt" for o, p in zip(outputs, names)]
except FileNotFoundError as e:
print("File not found! Manually raising IncompleteCheckpointException", e)
raise IncompleteCheckpointException(rules.cp_one, e.filename)
return inputs
rule all_working:
input: input_working
checkpoint cp_one:
output:
"filename~{name}.txt"
run:
(Path(output[0])).write_text(str(random.randint(0, 9)))
rule two:
input: "filename~{name}.txt"
output:
touch("content~{content}_filename~{name}.txt")Additional context
Metadata
Metadata
Assignees
Labels
Type
Projects
Status