Skip to content

Proposed solution to temp files that are not removed when using checkpoints #3715

@danpal96

Description

@danpal96

Snakemake version

snakemake main commit 26fcd38

Describe the bug

Regarding #2982 and related to #3218

I have a possible solution for the cases were temp files are not removed before all checkpoint are finalized but I am not very familiar with the Snakemake codebase so I will create this issue in case I am misunderstanding something.

First, my example snakefile:

# a target rule to define the desired final output
rule all:
    input:
        "results/aggregated/a.txt",
        "results/aggregated/b.txt",
        "results/aggregated/c.txt",
        "results/aggregated/d.txt",
        "results/aggregated/e.txt",

rule temp:
    output:
        temp("results/temp/{sample}.txt")
    shell:
        # simulate some output value
        "echo {wildcards.sample} > {output:q}"

# the checkpoint that shall trigger re-evaluation of the DAG
checkpoint somestep:
    input:
        "results/temp/{sample}.txt"
    output:
        temp("results/somestep/{sample}.txt")
    shell:
        # simulate some output value
        "cat {input:q} > {output:q}"


# intermediate rule
rule intermediate:
    input:
        "results/somestep/{sample}.txt"
    output:
        temp("results/post/{sample}.txt")
    shell:
        "cat {input:q} > {output:q}"


# alternative intermediate rule
rule alt_intermediate:
    input:
        "results/somestep/{sample}.txt"
    output:
        temp("results/alt/{sample}.txt")
    shell:
        "cat {input:q} > {output:q}"


# input function for the rule aggregate
def aggregate_input(wildcards):
    # decision based on content of output file
    # Important: use the method open() of the returned file!
    # This way, Snakemake is able to automatically download the file if it is generated in
    # a cloud environment without a shared filesystem.
    with checkpoints.somestep.get(sample=wildcards.sample).output[0].open() as f:
        if f.read().strip() == "a":
            return "results/post/{sample}.txt"
        else:
            return "results/alt/{sample}.txt"


rule aggregate:
    input:
        aggregate_input
    output:
        "results/aggregated/{sample}.txt"
    shell:
        "cat {input:q} > {output:q}"

The expected result is that all files created except a.txt b.txt c.txt d.txt e.txt in results/aggregated/ should be deleted. The current behavior is that in addition to these files, there are a.txt c.txt d.txt e.txt in results/temp/ that have not been deleted. In this case, the only file in results/temp/ that is removed is b.txt. The reason for this is that, the finish method that calls

async def handle_temp(self, job):
has the following code:

snakemake/src/snakemake/dag.py

Lines 2271 to 2277 in 26fcd38

if not self.checkpoint_jobs:
# While there are still checkpoint jobs, we cannot safely delete
# temp files.
# TODO: we maybe could be more accurate and determine whether there is a
# checkpoint that depends on the temp file.
for job in jobs:
await self.handle_temp(job)

that only removes temp files if there are no more checkpoint jobs, and this is only true for results/temp/b.txt whose job is the last to finish (no idea why is b and not other).

My proposed solution is to add a list as an instance attribute of DAG that saves all the jobs that could not handle the temp and wait to the first finish call where is safe to do the cleaning.

So, in __init__ add:

self._handle_temp_jobs = []

and on the finish method add:

        if not self.checkpoint_jobs:
            # While there are still checkpoint jobs, we cannot safely delete
            # temp files.
            # TODO: we maybe could be more accurate and determine whether there is a
            # checkpoint that depends on the temp file.
            for job in jobs:
                await self.handle_temp(job)
            while self._handle_temp_jobs:
                job = self._handle_temp_jobs.pop()
                await self.handle_temp(job)
        else:
            self._handle_temp_jobs.extend(jobs)

This will not address the TODO (I am not sure is possible) and I think that something like #3218 is needed but it will ensure that temp files are deleted when is safe to do so.

I have tested this and is mostly working as expected in the example, except for the duplication of some messages:

Removing temporary output results/temp/b.txt.                                                                                        
Removing temporary output results/temp/c.txt.                
Removing temporary output results/temp/a.txt.
Removing temporary output results/temp/d.txt.                  
Removing temporary output results/temp/e.txt.
Removing temporary output results/temp/d.txt.    
Removing temporary output results/temp/e.txt.
Removing temporary output results/temp/a.txt.
Removing temporary output results/temp/b.txt.
Removing temporary output results/temp/c.txt.  

I will also create a PR

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions