Skip to content

Checkpoint Aggregation Pattern - Snakemake running one sample at a time #16

@elsherbini

Description

@elsherbini

I often make rules that might produce an empty output file. I then have some aggregate step that wants to take all non-empty samples and do something to them.

It appears the current checkpoint functionality doesn't support this fully - snakemake runs one sample at a time and then reevaluates the DAG each time rather than running all samples simultaneously through the checkpoint rule.

Here is a minimal reproducible example that shows the behavior:

import os
SAMPLES = ["a","b","c","d"]

rule target:
    input: "output/aggregate_output"

checkpoint maybe_empty:
    output: "output/{sample}.txt"
    run:
        with open(output[0], "w") as f:
            if wildcards.sample=="a":
                pass
            else:
                f.write("something\n")

def get_nonempty_files(wildcards):
    return ["output/{}.txt".format(s) for s in SAMPLES if os.path.getsize(checkpoints.maybe_empty.get(sample=s).output[0])>0]

rule aggregate:
    input:
        get_nonempty_files
    output: "output/aggregate_output"
    run:
        with open(output[0], "w") as f:
            f.write("\n".join(input))

It appears checkpoints.maybe_empty.get(sample=s) raises the error for the first sample it tries, so it goes ahead and runs the necessary steps for that one sample. It then tries again and it raises the error for the second sample, etc.

The behavior I’d want with a checkpoint like this is:

  • the first time the input function runs, figure out all the checkpoint outputs it depends on and run those.

  • then rerun the input function after all checkpoints have been completed.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions