-
Notifications
You must be signed in to change notification settings - Fork 634
Description
I often make rules that might produce an empty output file. I then have some aggregate step that wants to take all non-empty samples and do something to them.
It appears the current checkpoint functionality doesn't support this fully - snakemake runs one sample at a time and then reevaluates the DAG each time rather than running all samples simultaneously through the checkpoint rule.
Here is a minimal reproducible example that shows the behavior:
import os
SAMPLES = ["a","b","c","d"]
rule target:
input: "output/aggregate_output"
checkpoint maybe_empty:
output: "output/{sample}.txt"
run:
with open(output[0], "w") as f:
if wildcards.sample=="a":
pass
else:
f.write("something\n")
def get_nonempty_files(wildcards):
return ["output/{}.txt".format(s) for s in SAMPLES if os.path.getsize(checkpoints.maybe_empty.get(sample=s).output[0])>0]
rule aggregate:
input:
get_nonempty_files
output: "output/aggregate_output"
run:
with open(output[0], "w") as f:
f.write("\n".join(input))
It appears checkpoints.maybe_empty.get(sample=s) raises the error for the first sample it tries, so it goes ahead and runs the necessary steps for that one sample. It then tries again and it raises the error for the second sample, etc.
The behavior I’d want with a checkpoint like this is:
-
the first time the input function runs, figure out all the checkpoint outputs it depends on and run those.
-
then rerun the input function after all checkpoints have been completed.