feat: Allow storing snakemake metadata in files or databases#4012
feat: Allow storing snakemake metadata in files or databases#4012
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds a pluggable, versioned persistence system: introduces PersistenceBase, FilePersistence, and DbPersistence; removes the old monolithic persistence module; wires new CLI options and WorkflowSettings to select backend and DB URL; updates workflow initialization and tests to exercise both backends. Changes
Sequence Diagram(s)sequenceDiagram
participant User as CLI User
participant CLI as CLI Parser
participant Settings as WorkflowSettings
participant Workflow as Workflow
participant Backend as Persistence Backend
User->>CLI: snakemake --persistence-backend db --persistence-backend-db-url <url>
CLI->>CLI: parse arguments
CLI->>Settings: construct with persistence_backend & persistence_backend_db_url
Settings-->>Workflow: passed into Workflow constructor
Workflow->>Workflow: inspect settings.persistence_backend
alt backend == "db"
Workflow->>Backend: instantiate DbPersistence(db_url=<url>)
else backend == "file"
Workflow->>Backend: instantiate FilePersistence()
end
Backend-->>Workflow: return PersistenceBase instance
Workflow->>Workflow: store as _persistence
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes 🚥 Pre-merge checks | ✅ 3 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
AbstractPersistence to allow separating persistence backends later onPersistenceBase to allow separate persistence backends
|
I wonder if we could abstract this even further, given that the methods usually do almost exactly the same, but the db backend just queries the db first and converts to/from representations… |
Yup, so the "backends" basically just do their specific record reading/writing now, so mostly io related stuff; everything else is handled in the base class. Also added some autogenerated docstrings for some of the abstract methods to make it a bit clearer on how to implement them. |
|
Ah, and right now, I just enabled the db backend by default to see if the CI succeeds with that or not. |
PersistenceBase to allow separate persistence backends--persistence-backend [file|db] and --persistence-backend-db-url URL to allow storing snakemake metadata in files or databases
--persistence-backend [file|db] and --persistence-backend-db-url URL to allow storing snakemake metadata in files or databasesThere was a problem hiding this comment.
Actionable comments posted: 7
🧹 Nitpick comments (2)
src/snakemake/persistence/file.py (2)
154-172: Redundant path computation.
pathis computed twice (lines 155 and 158). The second computation is unnecessary.♻️ Remove redundant line
def _io_read(self, subject, id): path = self._record_path(subject, id) if not os.path.exists(path): return dict() - path = self._record_path(subject, id) with open(path, "r") as f:🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/snakemake/persistence/file.py` around lines 154 - 172, In _io_read remove the redundant recomputation of path: compute path once by calling self._record_path(subject, id) at the top of the function and reuse that variable for the os.path.exists check and the subsequent open(), i.e. delete the second call to self._record_path(subject, id) so the function uses the single path variable throughout.
178-185:@lru_cacheon instance method can cause memory leaks.The
lru_cachedecorator holds references toself, preventing garbage collection of theFilePersistenceinstance. SinceFilePersistenceis typically long-lived per workflow and_clear_cache()exists, this may be acceptable, but consider using a cache that doesn't referenceselfif this class might be instantiated multiple times.♻️ Alternative using explicit cache dictionary
+ def __init__(self, ...): + ... + self._record_cache = {} + - `@lru_cache`() - def _read_record_cached(self, key: str) -> Optional[MetadataRecord]: + def _read_record_cached(self, key: str) -> Optional[MetadataRecord]: + if key in self._record_cache: + return self._record_cache[key] rec = self._io_read(self._metadata_path, key) - return ( + result = ( MetadataRecord(**{k: v for k, v in rec.items() if k in VALID_METADATA_KEYS}) if rec else None ) + self._record_cache[key] = result + return result + + def _clear_cache(self) -> None: - self._read_record_cached.cache_clear() + self._record_cache.clear() self._incomplete_cache = None🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/snakemake/persistence/file.py` around lines 178 - 185, The `@lru_cache` on the instance method _read_record_cached retains references to self and can leak FilePersistence instances; replace it with an instance-level cache (e.g. self._read_record_cache dict) used inside _read_record_cached to store/return results keyed by the method's key (and include self._metadata_path if records vary by path), and update/extend the existing _clear_cache() to clear that dict; locate usages in the FilePersistence class (methods _read_record_cached and _clear_cache) and ensure thread-safety if needed (e.g., simple locking) when accessing the cache.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/snakemake/cli.py`:
- Around line 1579-1585: The CLI help for the --persistence-backend-db-url
argument currently includes a password-bearing example DSN; update the help text
in the group_behavior.add_argument call for "--persistence-backend-db-url" to
remove the password from the example (e.g., use "postgresql://user@host/db" or
"postgresql://host/db") and add a short note directing users to provide
credentials via profiles or environment-based configuration instead of passing
secrets on the command line.
In `@src/snakemake/persistence/__init__.py`:
- Around line 173-185: The cleanup code that wraps shutil.rmtree and Path.unlink
currently raises WorkflowError without chaining, losing the original exception;
modify the exception handlers around the rmtree call (in the block handling
removal of conda env at variable address) and the yaml unlink block
(yaml_path.unlink()) to catch OSError (or Exception if broader) and re-raise
WorkflowError using exception chaining (raise WorkflowError(f"...: {e}") from e)
so the original filesystem exception and traceback are preserved.
In `@src/snakemake/persistence/db.py`:
- Around line 142-157: The code eagerly creates a stub MetadataRecordORM in
_mark_incomplete which leaves behind empty rows when metadata retention is
disabled; update _mark_incomplete to avoid creating a new MetadataRecordORM when
only marking incomplete (i.e., only session.get(MetadataRecordORM, key) and set
incomplete/external_jobid on existing record, do not instantiate a new
MetadataRecordORM), and also harden _unmark_incomplete to remove stub records
created solely for the incomplete marker (detect records with
record_format_version == 0 and no provenance/metadata fields and call
session.delete(record) instead of clearing incomplete) so
PersistenceBase.finished() / has_metadata() no longer observes dropped metadata.
In `@src/snakemake/persistence/file.py`:
- Around line 240-259: _write_locks currently overwrites
self._lockfile[lock_type] so repeated calls orphan earlier lock files; change
the storage to keep multiple files per type (e.g., make
self._lockfile[lock_type] a list) and append each new lockfile instead of
assigning, using something like self._lockfile.setdefault(lock_type,
[]).append(lockfile) in _write_locks; update _delete_locks to iterate all stored
paths for each lock_type (handle lists of paths, remove each file while keeping
the same OSError handling) and then clear the mapping.
- Around line 138-152: _in _io_delete_, recdirs is computed as a path relative
to subject then passed to os.removedirs which treats paths relative to cwd;
change to call os.removedirs with the actual directory path of the removed
record (use os.path.dirname(recpath) or an absolute path derived from recpath)
instead of the relative recdirs, and keep the existing check so you only call
removedirs when the directory isn't "." to avoid removing unexpected
directories.
In `@src/snakemake/workflow.py`:
- Around line 917-924: The persistence selection currently passes a None db_url
and silently accepts unknown backends; update the code in the block that checks
self.workflow_settings.persistence_backend so that if the backend == "db" you
only include "db_url" in persistence_kwargs when
self.workflow_settings.persistence_backend_db_url is truthy (do not forward
None), and if the backend value is neither "db" nor "file" raise WorkflowError;
ensure you set persistence = DbPersistence (with persistence_kwargs possibly
empty) for "db" and persistence = FilePersistence for the explicit "file"
backend.
---
Nitpick comments:
In `@src/snakemake/persistence/file.py`:
- Around line 154-172: In _io_read remove the redundant recomputation of path:
compute path once by calling self._record_path(subject, id) at the top of the
function and reuse that variable for the os.path.exists check and the subsequent
open(), i.e. delete the second call to self._record_path(subject, id) so the
function uses the single path variable throughout.
- Around line 178-185: The `@lru_cache` on the instance method _read_record_cached
retains references to self and can leak FilePersistence instances; replace it
with an instance-level cache (e.g. self._read_record_cache dict) used inside
_read_record_cached to store/return results keyed by the method's key (and
include self._metadata_path if records vary by path), and update/extend the
existing _clear_cache() to clear that dict; locate usages in the FilePersistence
class (methods _read_record_cached and _clear_cache) and ensure thread-safety if
needed (e.g., simple locking) when accessing the cache.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 82593f8e-10d6-443e-a3ba-1edaad4bc1e1
⛔ Files ignored due to path filters (1)
pyproject.tomlis excluded by!pyproject.toml
📒 Files selected for processing (10)
src/snakemake/cli.pysrc/snakemake/persistence.pysrc/snakemake/persistence/__init__.pysrc/snakemake/persistence/db.pysrc/snakemake/persistence/file.pysrc/snakemake/settings/types.pysrc/snakemake/workflow.pytests/common.pytests/test_persistence.pytests/tests.py
💤 Files with no reviewable changes (1)
- src/snakemake/persistence.py
|
Please format your code with |
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (1)
src/snakemake/persistence/__init__.py (1)
483-517: Compute input checksums once per job.These digests depend only on
job.input, so the current placement repeats the same checksum I/O once for every output file.Suggested refactor
+ checksums = { + str(infile): await infile.checksum(self.max_checksum_file_size) + for infile in job.input + } + checksums = {k: v for k, v in checksums.items() if v is not None} + for f in job.output: key = self._get_key(f) endtime = ( (await f.mtime()).local_or_storage() if await f.exists() else fallback_time ) - - checksums = { - str(infile): await infile.checksum(self.max_checksum_file_size) - for infile in job.input - } - checksums = {k: v for k, v in checksums.items() if v is not None} record = self._read_record(key) or MetadataRecord()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/snakemake/persistence/__init__.py` around lines 483 - 517, The loop over "for f in job.output" recomputes input file checksums for every output; move the checksum collection out of the loop and compute it once per job by evaluating the dict comprehension that builds "checksums" (based on job.input and self.max_checksum_file_size) before the loop, filter None values into "input_checksums", and then inside the loop assign record.input_checksums = input_checksums; adjust placement so symbols _get_key, _read_record, _write_record, MetadataRecord, RECORD_FORMAT_VERSION, and any uses of endtime/fallback_time remain unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/snakemake/persistence/db.py`:
- Around line 112-164: _read_record_cached is currently unbounded and not
invalidated on mutations; add a bounded cache and ensure mutations clear it.
Change `@lru_cache`(maxsize=None) on _read_record_cached to a bounded size like
`@lru_cache`(maxsize=1024) (or another appropriate maxsize for your workload), and
call self._clear_cache() at the end of each mutating method (_write_record,
_delete_record, _mark_incomplete, _unmark_incomplete) so the cached results are
invalidated after inserts/updates/deletes; keep the existing _clear_cache
implementation that calls self._read_record_cached.cache_clear().
- Around line 76-98: The DbPersistence __init__ currently ignores the provided
path when db_url is omitted; change the logic so if db_url is None (or omitted)
you construct the sqlite URL from the instance path: compute db_root = self.path
or Path.cwd(), db_file = (Path(db_root) / ".snakemake" /
"metadata.db").resolve(), then set self.engine =
create_engine(f"sqlite:///{db_file}") so the metadata DB lives under the
provided persistence root; reference DbPersistence.__init__, the db_url
parameter, and the self.engine creation when making this change.
- Around line 70-73: The current LockORM/table design is fine but _write_locks()
must not use session.merge() (an upsert) because that allows two concurrent
processes to overwrite each other's locks; change _write_locks() to attempt pure
inserts of new LockORM(...) rows (e.g., session.add_all or individual
session.add) and commit, catching sqlalchemy.exc.IntegrityError to detect
existing rows, then rollback and surface a lock-acquisition failure back to
PersistenceBase.lock(); remove/replace session.merge() calls so any duplicate
primary-key insert fails rather than upserting.
---
Nitpick comments:
In `@src/snakemake/persistence/__init__.py`:
- Around line 483-517: The loop over "for f in job.output" recomputes input file
checksums for every output; move the checksum collection out of the loop and
compute it once per job by evaluating the dict comprehension that builds
"checksums" (based on job.input and self.max_checksum_file_size) before the
loop, filter None values into "input_checksums", and then inside the loop assign
record.input_checksums = input_checksums; adjust placement so symbols _get_key,
_read_record, _write_record, MetadataRecord, RECORD_FORMAT_VERSION, and any uses
of endtime/fallback_time remain unchanged.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: e5300a9b-ac8a-4fe3-84a5-0100357dda73
📒 Files selected for processing (3)
src/snakemake/persistence/__init__.pysrc/snakemake/persistence/db.pysrc/snakemake/workflow.py
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (2)
src/snakemake/persistence/db.py (2)
195-199:⚠️ Potential issue | 🔴 CriticalUse plain inserts for lock rows.
With
file_pathas the primary key,session.merge()turns duplicate lock acquisition into overwrite-or-insert. A concurrent process can therefore commit an already-held lock instead of surfacing contention. Use insert-only writes and fail on duplicates.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/snakemake/persistence/db.py` around lines 195 - 199, The _write_locks method currently uses session.merge(LockORM(...)) which performs upsert semantics and can overwrite an existing lock; change it to perform insert-only writes so a duplicate primary-key insertion raises an IntegrityError and surfaces contention. Replace session.merge(...) with session.add(...) / session.add_all(...) (creating LockORM instances for each key) and let session.commit() raise on duplicates, or explicitly catch sqlalchemy.exc.IntegrityError around the commit in _write_locks to translate it into a clear lock-contention error; refer to the _write_locks method, LockORM model and Session/engine when making the change.
112-164:⚠️ Potential issue | 🟠 MajorBound and invalidate the metadata read cache.
_read_record_cached()can retainNoneor stale rows after_write_record(),_delete_record(),_mark_incomplete(), or_unmark_incomplete()because none of those paths clear it.maxsize=Nonealso lets large workflows keep every touched record in memory.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/snakemake/persistence/db.py` around lines 112 - 164, _read_record_cached uses lru_cache(maxsize=None) and is never invalidated, so writes/deletes/marks leave stale or None results cached and unbounded cache can grow unbounded; change the decorator to a bounded size (e.g., `@lru_cache`(maxsize=1024)) and call self._clear_cache() at the end of _write_record, _delete_record (before return), _mark_incomplete, and _unmark_incomplete to invalidate the cache after any mutation so subsequent _read_record calls see fresh data.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/snakemake/persistence/db.py`:
- Around line 166-186: Both _filter_incomplete_keys and _get_external_jobids
build a single large IN(...) which can exceed SQLite bind limits; change each to
process keys in chunks (e.g., chunk_size = 1000) by iterating over chunks of
keys_list, issuing a separate select per chunk (same where predicates:
MetadataRecordORM.target.in_(chunk) + the existing incomplete/external_jobid
filters), collecting and unioning results into the returned set, and avoid early
returns so empty input still returns set(). Ensure you deduplicate across chunks
and use Session(self.engine) for the batched queries in the same way they are
currently opened.
---
Duplicate comments:
In `@src/snakemake/persistence/db.py`:
- Around line 195-199: The _write_locks method currently uses
session.merge(LockORM(...)) which performs upsert semantics and can overwrite an
existing lock; change it to perform insert-only writes so a duplicate
primary-key insertion raises an IntegrityError and surfaces contention. Replace
session.merge(...) with session.add(...) / session.add_all(...) (creating
LockORM instances for each key) and let session.commit() raise on duplicates, or
explicitly catch sqlalchemy.exc.IntegrityError around the commit in _write_locks
to translate it into a clear lock-contention error; refer to the _write_locks
method, LockORM model and Session/engine when making the change.
- Around line 112-164: _read_record_cached uses lru_cache(maxsize=None) and is
never invalidated, so writes/deletes/marks leave stale or None results cached
and unbounded cache can grow unbounded; change the decorator to a bounded size
(e.g., `@lru_cache`(maxsize=1024)) and call self._clear_cache() at the end of
_write_record, _delete_record (before return), _mark_incomplete, and
_unmark_incomplete to invalidate the cache after any mutation so subsequent
_read_record calls see fresh data.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 8cc08e0b-c7a9-4e53-bcc8-280320cb9ea3
📒 Files selected for processing (1)
src/snakemake/persistence/db.py
To add, in my old testing I ran into issues with subprocesses writing to persistence. It shouldn't occur now, but any workflow with |
|
I'll be adding one more test, sorry ^^ |
|
@cademirch added a test that uses |
🤖 I have created a release *beep* *boop* --- ## [9.17.0](v9.16.3...v9.17.0) (2026-03-13) ### Features * Allow storing snakemake metadata in files or databases ([#4012](#4012)) ([dd75f31](dd75f31)) * Allow to specify comparison command per-unit test ([#3956](#3956)) ([b88171c](b88171c)) * job table orderd topological when run is started ([#4018](#4018)) ([75cf506](75cf506)) * lambda functions for priority in rules ([#3253](#3253)) ([d2aa226](d2aa226)) * Make on... directive of modules accessible ([#4050](#4050)) ([e9f2e1c](e9f2e1c)) ### Bug Fixes * adjust conda tests to not fail on apple silicon; fix [#4040](#4040) ([#4049](#4049)) ([f5b0142](f5b0142)) * allow "--containerize apptainer" to output apptainer format instead of dockerfile ([#4030](#4030)) ([f5cac30](f5cac30)) * apptainer command not recognized when singularity is absent ([#4010](#4010)) ([b8162e2](b8162e2)) * capture stderr when tests fail ([#3995](#3995)) ([97d74ba](97d74ba)) * **docs:** make Data-dependent conditional execution a complete example ([#4043](#4043)) ([3a1d7f2](3a1d7f2)) * don't build the DAG when running unlock. Fixes [#4000](#4000) and [#198](#198) ([#4007](#4007)) ([acf79fd](acf79fd)) * Ensure pixi tasks may be run as advertised ([#4046](#4046)) ([88253c2](88253c2)) * fix checkpoint handling corner cases ([#3870](#3870) and [#3559](#3559)) ([#4015](#4015)) ([63f4257](63f4257)) * issue 3642 ([#4054](#4054)) ([76e6fc2](76e6fc2)) * issue 3815 ([#4026](#4026)) ([b0eec96](b0eec96)) * logging None in shellcmd context causes error ([#4064](#4064)) ([d0652cd](d0652cd)) * lookup function returns default value for empty DataFrame queries ([#4056](#4056)) ([f71de97](f71de97)) * make `cache: omit-software` a rule specific property ([#4085](#4085)) ([034a9e7](034a9e7)) * reduce number of tests leaving temporary files behind ([#4033](#4033)) ([a3a1c97](a3a1c97)) * regression in dynamic resource handling ([#4038](#4038)) ([f2c554a](f2c554a)) * somewhat shorter announce message ([#4080](#4080)) ([57efc71](57efc71)) ### Performance Improvements * switch reretry with tenacity; decouple container classes (with Python 3.7 compat for old scripts) from rest of the codebase (enabling moving to newer python versions) ([#4032](#4032)) ([ffb19e7](ffb19e7)) ### Documentation * Add AI-assisted contributions policy to contributing guidelines ([#4051](#4051)) ([dd70526](dd70526)) * **codebase:** Update & simplify plugin architecture section ([#4052](#4052)) ([176cf63](176cf63)) * Correct workflow.source_path() description in documentation ([#4036](#4036)) ([45883c5](45883c5)) * fixed wrong code example for collect() function ([#4037](#4037)) ([5c85ed8](5c85ed8)) * Minor docs improvements ([#4089](#4089)) ([29ea226](29ea226)) * switch to sphinx_design for tabs ([#3976](#3976)) ([9674614](9674614)) * typo in the migration table breaking a pip install command ([#4024](#4024)) ([66f9dda](66f9dda)) --- This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please).
This PR allows choosing whether metadata stored in .snakemake/metadata is stored as files or in a database.
To this end, we expose
--persistence-backend [file|db]and--persistence-backend-db-url URL]cli options to choose between the two.There is NO conversion (between files and db backend) logic implemented.
There is also NO db schema migration logic implemented, yet (will leave that as future work ;)).
Resolves #2969 (well, at least I hope so)
Summary by CodeRabbit
New Features
Refactor
Tests