Skip to content

"Second Order" checkpoints are not correctly handled #3862

@jackerschott

Description

@jackerschott

Snakemake version
9.14.1 (latest at time of writing) and main branch version

Describe the bug
Let's start with a minimal example:

def derive_from_bob_output(wildcards):
    with checkpoints.bob.get().output[0].open() as f:
        return f.read().strip()


def derive_from_alice_output(wildcards):
    with checkpoints.alice.get().output[0].open() as f:
        return f.read().strip()


rule all:
    input:
        derive_from_bob_output


rule intermediate:
    input:
        "alice.txt",
        "bob.txt",


rule needs_alice:
    input:
        derive_from_alice_output
    output:
        touch("second.txt")
    shell:
        "test {input} = 'first.txt' || exit 1"


rule first:
    output:
        touch("first.txt")


checkpoint alice:
    output:
        "alice.txt"
    shell:
        "echo first.txt > {output}"


checkpoint bob:
    output:
        "bob.txt"
    shell:
        "echo second.txt > {output}"

Here we have two checkpoints alice and bob, a rule that produces first.txt and a rule that produces second.txt, which depends on the input first.txt that is derived from the output of alice.
We also have two target rules: intermediate, which produces all checkpoint outputs, and all, which produces second.txt, which is derived from bob.
The needs_alice rule which produces second.txt also checks wether or not it received the correct input from alice.

First we can run snakemake --cores 1 --printshellcmds all which runs without a problem.
Then we can remove the outputs with rm *.txt and try snakemake --cores 1 --printshellcmds intermediate to try and execute the pipeline in two steps. The first step works again without a problem, however snakemake --cores 1 --printshellcmds all, results in the following output:

[Fri Dec  5 11:40:08 2025]
localrule needs_alice:
    input: <TBD>
    output: second.txt
    jobid: 3
    reason: Missing output files: second.txt
    resources: tmpdir=/tmp
Shell command: test alice.txt = 'first.txt' || exit 1
RuleException:
CalledProcessError in file "/home/jona/tmp/snakemake_test/Snakefile", line 28:
Command 'set -euo pipefail;  test alice.txt = 'first.txt' || exit 1' returned non-zero exit status 1.
[Fri Dec  5 11:40:08 2025]
Error in rule needs_alice:
    message: None
    jobid: 3
    input: <TBD>
    output: second.txt
    shell:
        test alice.txt = 'first.txt' || exit 1
        (command exited with non-zero exit code)

This one fails because it receives the input alice.txt instead of first.txt.
It is completely unclear to the user why this happens, since alice.txt is the checkpoint output which we are reading from with derive_from_alice_output and we are never writing anything remotely close to alice.txt in this file, we are writing first.txt into this file.

Why does this happen? I tried to investigate a bit. It's not hard to guess that the culprit here is the strategy to inject a checkpoint output into the dependent rule if the checkpoint output is not yet created to automatically trigger the checkpoint execution. The relevant code should be this one here I think in the expand_input function of the rule class:

        def handle_incomplete_checkpoint(exception):
            """If checkpoint is incomplete, target it such that it is completed
            before this rule gets executed."""
            return exception.targetfile

We get an incomplete checkpoint exception because in the checkpoint class we have the following:

    def get(self, **wildcards):
        missing = self.rule.wildcard_names.difference(wildcards.keys())
        if missing:
            raise WorkflowError(
                "Missing wildcard values for {}".format(", ".join(missing))
            )

        output, _ = self.rule.expand_output(wildcards)
        if self.checkpoints.created_output:
            missing_output = set(output) - set(self.checkpoints.created_output)
            if not missing_output:
                return CheckpointJob(self.rule, output)
            else:
                logger.debug(
                    f"Missing checkpoint output for {self.rule.name} "
                    f"(wildcards: {wildcards}): {','.join(missing_output)} of {','.join(output)}"
                )

        raise IncompleteCheckpointException(self.rule, checkpoint_target(output[0]))

If the checkpoint output of alice is not registered in self.checkpoints.created_output, we automatically get an incomplete checkpoint exception, which would pass the alice output to the needs_alice rule.
Since the output already exists though, the checkpoint is never run and the re-evaluation of the derive_from_alice_output input function is never triggered.
Why would the alice output be missing though from self.checkpoints.created_output even though alice already ran previously and the output already exists? That's the main problem and it's in the update_checkpoint_dependencies function of the DAG class:

    async def update_checkpoint_dependencies(self, jobs=None):
        """Update dependencies of checkpoints."""

        async def is_output_present(job):
            return (self.finished(job) or not self.needrun(job)) and all(
                await asyncio.gather(*(out.exists() for out in job.output))
            )

        job_queue = defaultdict(set)
        if jobs is None:
            jobs = [
                job
                for job in self.jobs
                if job.is_checkpoint and await is_output_present(job)
            ]
        else:
            # called with finished jobs, no need to check if output is present
            jobs = [job for job in jobs if job.is_checkpoint]
        for job in jobs:
            for depending in self.depending[job]:
                job_queue[depending].add(job)
            self.workflow.checkpoints.created_output.update(job.output)

        updated = len(job_queue) > 0
        if updated:
            logger.info("Updating checkpoint dependencies.")

        i = 1
        while job_queue:
            logger.debug(f"Checkpoint dependency update round {i}")
            for noneedrun_checkpoint_deps in job_queue.values():
                for checkpoint in noneedrun_checkpoint_deps:
                    self.workflow.checkpoints.created_output.update(checkpoint.output)

            candidate_job_queue = defaultdict(set)
            for job in job_queue.keys():
                prior_checkpoint_targets = {
                    infile
                    for infile in job.input
                    if is_flagged(infile, "checkpoint_target")
                }
                updated_job = await job.updated()

                await self.replace_job(job, updated_job, recursive=False)

                posterior_checkpoint_targets = {
                    infile
                    for infile in updated_job.input
                    if is_flagged(infile, "checkpoint_target")
                    and infile not in prior_checkpoint_targets
                }
                posterior_checkpoint_deps = {
                    dep
                    for dep, files in self.dependencies[updated_job].items()
                    if dep.is_checkpoint
                    and not files.isdisjoint(posterior_checkpoint_targets)
                }

                if posterior_checkpoint_deps:
                    # there is at least one dep that is a new potentially
                    # unfinished checkpoint target
                    candidate_job_queue[updated_job].update(posterior_checkpoint_deps)

            job_queue = defaultdict(set)
            if candidate_job_queue:
                await self.update_needrun()
                for job, posterior_checkpoint_deps in candidate_job_queue.items():
                    for checkpoint in posterior_checkpoint_deps:
                        if not self.needrun(checkpoint):
                            job_queue[job].add(checkpoint)
            i += 1

        if updated:
            self.set_until_jobs()
            self.delete_omitfrom_jobs()
            await self.postprocess_after_update()

        return updated

Here the following happens:

  1. We gather all checkpoints that have their outputs present in the current DAG. So far we evaluated derive_from_bob_output once and got bob.txt, so the only checkpoint in the DAG is bob.
  2. Add all jobs to the job queue that depend on a checkpoint with the output present
    together with all the checkpoints they depend on (if their output is present);
    this will be bob under the key all
    loop:
  3. go through all jobs in the jobs queue and mark the checkpoints they depend on as having
    their outputs created; this will mark only bob
  4. Go through all jobs in the job queue, update their properties (now that we now about existing checkpoint outputs),
    and find all new input files that are flagged as checkpoint targets that became available afterwards; the new input file for the all rule (only one in the job queue) will be second.txt but it's not a checkpoint target
  5. Find all checkpoints that the updated job depends on and that provide any of the new input files and add them to the candidate job queue under the updated job; we have no new input files, so the candidate job queue is empty
  6. Update the needrun markers for all jobs in the DAG
  7. Set the job queue to contain all checkpoints in the candidate job queue that already ran under the job that depends on them; the candidate job queue is empty, so the new job queue will be empty as well and we break out of the loop; alice never got marked as having already created outputs

I think the error here is the assumption that all checkpoint targets will appear in the initial dag and never in any upstream rules that only become available after the dag got updated at least once.

This might seem like an obscure example, but I think that a heavy checkpoint usage will quickly result in producing such a case. I discovered this bug because I am working on a data analysis pipeline with three different checkpoints that produce downstream wildcard sets and I was running into this issue constantly, making it an absolute pain to use snakemake for this. Either you settle on not using checkpoints, such that you have to run the pipeline in three different steps every time or you have to deal with obscure unpredictable errors that you can only fix by trying random stuff until something magically works.

I will likely try to fix this myself, because I really need it to work right now, but some expertise would be very welcome, since I don't wanna break anything else while fixing it.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions