-
Notifications
You must be signed in to change notification settings - Fork 634
Description
Snakemake version
snakemake main commit 26fcd38
Describe the bug
Regarding #2982 and related to #3218
I have a possible solution for the cases were temp files are not removed before all checkpoint are finalized but I am not very familiar with the Snakemake codebase so I will create this issue in case I am misunderstanding something.
First, my example snakefile:
# a target rule to define the desired final output
rule all:
input:
"results/aggregated/a.txt",
"results/aggregated/b.txt",
"results/aggregated/c.txt",
"results/aggregated/d.txt",
"results/aggregated/e.txt",
rule temp:
output:
temp("results/temp/{sample}.txt")
shell:
# simulate some output value
"echo {wildcards.sample} > {output:q}"
# the checkpoint that shall trigger re-evaluation of the DAG
checkpoint somestep:
input:
"results/temp/{sample}.txt"
output:
temp("results/somestep/{sample}.txt")
shell:
# simulate some output value
"cat {input:q} > {output:q}"
# intermediate rule
rule intermediate:
input:
"results/somestep/{sample}.txt"
output:
temp("results/post/{sample}.txt")
shell:
"cat {input:q} > {output:q}"
# alternative intermediate rule
rule alt_intermediate:
input:
"results/somestep/{sample}.txt"
output:
temp("results/alt/{sample}.txt")
shell:
"cat {input:q} > {output:q}"
# input function for the rule aggregate
def aggregate_input(wildcards):
# decision based on content of output file
# Important: use the method open() of the returned file!
# This way, Snakemake is able to automatically download the file if it is generated in
# a cloud environment without a shared filesystem.
with checkpoints.somestep.get(sample=wildcards.sample).output[0].open() as f:
if f.read().strip() == "a":
return "results/post/{sample}.txt"
else:
return "results/alt/{sample}.txt"
rule aggregate:
input:
aggregate_input
output:
"results/aggregated/{sample}.txt"
shell:
"cat {input:q} > {output:q}"The expected result is that all files created except a.txt b.txt c.txt d.txt e.txt in results/aggregated/ should be deleted. The current behavior is that in addition to these files, there are a.txt c.txt d.txt e.txt in results/temp/ that have not been deleted. In this case, the only file in results/temp/ that is removed is b.txt. The reason for this is that, the finish method that calls
snakemake/src/snakemake/dag.py
Line 936 in 26fcd38
| async def handle_temp(self, job): |
snakemake/src/snakemake/dag.py
Lines 2271 to 2277 in 26fcd38
| if not self.checkpoint_jobs: | |
| # While there are still checkpoint jobs, we cannot safely delete | |
| # temp files. | |
| # TODO: we maybe could be more accurate and determine whether there is a | |
| # checkpoint that depends on the temp file. | |
| for job in jobs: | |
| await self.handle_temp(job) |
that only removes temp files if there are no more checkpoint jobs, and this is only true for results/temp/b.txt whose job is the last to finish (no idea why is b and not other).
My proposed solution is to add a list as an instance attribute of DAG that saves all the jobs that could not handle the temp and wait to the first finish call where is safe to do the cleaning.
So, in __init__ add:
self._handle_temp_jobs = []and on the finish method add:
if not self.checkpoint_jobs:
# While there are still checkpoint jobs, we cannot safely delete
# temp files.
# TODO: we maybe could be more accurate and determine whether there is a
# checkpoint that depends on the temp file.
for job in jobs:
await self.handle_temp(job)
while self._handle_temp_jobs:
job = self._handle_temp_jobs.pop()
await self.handle_temp(job)
else:
self._handle_temp_jobs.extend(jobs)This will not address the TODO (I am not sure is possible) and I think that something like #3218 is needed but it will ensure that temp files are deleted when is safe to do so.
I have tested this and is mostly working as expected in the example, except for the duplication of some messages:
Removing temporary output results/temp/b.txt.
Removing temporary output results/temp/c.txt.
Removing temporary output results/temp/a.txt.
Removing temporary output results/temp/d.txt.
Removing temporary output results/temp/e.txt.
Removing temporary output results/temp/d.txt.
Removing temporary output results/temp/e.txt.
Removing temporary output results/temp/a.txt.
Removing temporary output results/temp/b.txt.
Removing temporary output results/temp/c.txt.
I will also create a PR