Skip to content

Using get() on checkpoints in input function multiple times only schedules the checkpoint once #3283

@gabuzi

Description

@gabuzi

Hi!

I have a rather complicated workflow that produces lots of files with unpredictable names (mainly timestamps).
There also is a need to create output filenames based on the content of input files.
Checkpoints should be able to provide support for this, but trying to implement it, I stumbled upon some inconsistent behavior regarding checkpoints that I think qualifies as a bug.

Snakemake version

8.27.1

Describe the bug

If I try to get outputs from a checkpoint multiple times in an input function with different wildcard values, the checkpoint seems to only be scheduled for execution once. This then causes FileNotFoundErrors down the line where the input function expects a checkpoint output to be present, but it is not as apparently, the checkpoint was only scheduled for execution the first time its outputs were accessed via get().

The good news is that by manually raising an IncompleteCheckpointException with the appropriate rule and target files in the input function fixes the issue! See the example below.

Logs

Minimal example

Run the bug with snakemake all_bug and the hacky-fixed one with snakemake all_working.

# Main entrypoint of the workflow. 
# Please follow the best practices: 
# https://snakemake.readthedocs.io/en/stable/snakefiles/best_practices.html,
# in particular regarding the standardized folder structure mentioned there.
import random

from snakemake.exceptions import IncompleteCheckpointException


names = ["one", "two"]  # only using a single entry here works fine!


def input_bug(wildcards):
    outputs = [Path(checkpoints.cp_one.get(name=p).output[0]) for p in names]
    inputs = [f"content~{o.read_text()}_filename~{p}.txt" for o, p in zip(outputs, names)]
    return inputs

rule all_bug:
    input: input_bug

# Trying to fix with manually raising the exception
# This works!
def input_working(wildcards):
    outputs = [Path(checkpoints.cp_one.get(name=p).output[0]) for p in names]
    try:
        inputs = [f"content~{o.read_text()}_filename~{p}.txt" for o, p in zip(outputs, names)]
    except FileNotFoundError as e:
        print("File not found! Manually raising IncompleteCheckpointException", e)
        raise IncompleteCheckpointException(rules.cp_one, e.filename)
    return inputs

rule all_working:
    input: input_working

checkpoint cp_one:
    output:
        "filename~{name}.txt"
    run:
        (Path(output[0])).write_text(str(random.randint(0, 9)))

rule two:
    input: "filename~{name}.txt"
    output:
        touch("content~{content}_filename~{name}.txt")

Additional context

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No type

Projects

Status

Done

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions