Skip to content

fix: reuse async runner to prevent port exhaustion#3911

Merged
johanneskoester merged 13 commits intosnakemake:mainfrom
coroa:feat/reuse-async-runner
Jan 15, 2026
Merged

fix: reuse async runner to prevent port exhaustion#3911
johanneskoester merged 13 commits intosnakemake:mainfrom
coroa:feat/reuse-async-runner

Conversation

@coroa
Copy link
Copy Markdown
Contributor

@coroa coroa commented Jan 8, 2026

Interestingly i debugged the same issue as reported in #3909 last week, and came to a slightly different implementation for reusing the asynchronuous loop.

The benefit is that it is more explicit, than the implicit reuse. The drawback is that you still have multiple async runners, since the async_run in InputFiles._predicated_size_files cannot access the workflow object, when it is being used through eval_resource_expression.

QC

  • The PR contains a test case for the changes or the changes are already covered by an existing test case.
  • The documentation (docs/) is updated to reflect the changes or this is not necessary (e.g. if the change does neither modify the language nor the behavior or functionalities of Snakemake).

Summary by CodeRabbit

  • Refactor

    • Centralized asynchronous dispatch into per-workflow runners with orderly creation and teardown, replacing scattered external async calls.
  • New Features

    • Workflow-managed async execution now drives scheduling, execution, caching, postprocessing and touch operations.
    • Resource/input/benchmark evaluation callables and input helpers can accept and propagate an async execution handle.
  • Bug Fixes

    • Fixed resource iteration extraction and simplified an IO-file error message for clearer output.

✏️ Tip: You can customize this high-level summary in your review settings.

@coroa coroa requested a review from johanneskoester as a code owner January 8, 2026 13:57
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Jan 8, 2026

📝 Walkthrough

Walkthrough

Introduces per-workflow async runner infrastructure: adds async_runner() and Workflow.async_run with per-thread asyncio.Runners and shared executor; numerous call sites (executors, scheduler, IO, resources, rules) now route async work through the workflow's runner.

Changes

Cohort / File(s) Summary
Async infra
src/snakemake/common/__init__.py
Added async_runner(loop_factory=None, executor=None) to create an asyncio.Runner outside a running loop; async_run now delegates to runner API.
Workflow integration
src/snakemake/workflow.py
Added attributes _async_runners, _async_executor (ThreadPoolExecutor), _async_lock; added Workflow.async_run(self, coro) to manage per-thread runners; tear_down closes runners and shuts down executor; replaced public import of async_run with async_runner.
Executors
src/snakemake/executors/local.py, src/snakemake/executors/dryrun.py, src/snakemake/executors/touch.py
Replaced global async_run(...) with self.workflow.async_run(...); added async def acached_or_run(...) in local executor to await cache fetch/store via workflow runner.
Scheduler & job orchestration
src/snakemake/scheduling/job_scheduler.py
Replaced external async_run(...) calls with self.workflow.async_run(...) across scheduling, postprocess, input-size updates, error handling, and related flows; removed external async_run import.
I/O
src/snakemake/io/__init__.py
Switched to contextual lookup globals().get("async_run", async_run_fallback)(...); imported/exported async_run_fallback from snakemake.common.
Resources & rules
src/snakemake/resources.py, src/snakemake/rules.py
Propagated async_run into resource/rule callable factories and aux param dicts; updated generated callable signatures to accept async_run.
Minor fixes
src/snakemake/resources.py, src/snakemake/rules.py, src/snakemake/io/__init__.py
Fixed dict-comprehension unpack bug, simplified an error message, and adjusted imports/names to integrate new async paths.

Sequence Diagram(s)

sequenceDiagram
    participant Caller as Scheduler/Executor/IO
    participant Workflow as Workflow.async_run
    participant Thread as Per-thread context
    participant Runner as asyncio.Runner (per-thread)
    participant Task as Coroutine (e.g., touch/fetch/postprocess)
    participant Storage as I/O or Cache

    Caller->>Workflow: async_run(coro)
    Workflow->>Thread: lock and select/create runner for thread
    Thread->>Runner: obtain runner
    Workflow->>Runner: runner.run(coro)
    Runner->>Task: execute coroutine
    Task->>Storage: await I/O/cache operation
    Storage-->>Task: return I/O result
    Task-->>Runner: complete with result
    Runner-->>Workflow: deliver result
    Workflow-->>Caller: return result
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Suggested labels

enhancement

Suggested reviewers

  • johanneskoester
🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 16.36% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title 'fix: reuse async runner to prevent port exhaustion' directly and clearly summarizes the main change—it identifies the core problem being addressed (port exhaustion) and the solution (reusing async runner).
Description check ✅ Passed The description explains the implementation approach, acknowledges trade-offs, and confirms both test coverage and documentation updates were completed, aligning with the template requirements.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/snakemake/scheduling/job_scheduler.py (1)

374-378: Inconsistent: async_run() not replaced with centralized runner.

This call site still uses async_run() while all other call sites in this file (lines 350, 486, 496, 552, 576) use self.workflow._async_runner.run(). This appears to be a missed refactoring spot.

🔧 Suggested fix
-                            async_run(
+                            self.workflow._async_runner.run(
                                 self.workflow.dag.retrieve_storage_inputs(
                                     jobs=runjobs, also_missing_internal=True
                                 )
                             )
🧹 Nitpick comments (4)
src/snakemake/executors/touch.py (1)

17-17: Unused import.

The async_run import is no longer used in this file since the code now uses self.workflow._async_runner.run() directly.

🧹 Suggested fix
-from snakemake.common import async_run
src/snakemake/executors/dryrun.py (1)

12-12: Unused import.

The async_run import is no longer used in this file since the code now uses self.workflow._async_runner.run() directly.

🧹 Suggested fix
-from snakemake.common import async_run
src/snakemake/common/__init__.py (1)

96-114: Use of private asyncio API _get_running_loop.

Line 106 uses asyncio.events._get_running_loop() which is a private API. While this mirrors what asyncio.run does internally, it could break in future Python versions. Consider using the public asyncio.get_running_loop() wrapped in a try/except instead:

♻️ Suggested alternative
-    # asyncio.run fast-path to detect a running event loop, which we do not support yet
-    if asyncio.events._get_running_loop() is not None:
+    # Check if we're already inside a running event loop
+    try:
+        asyncio.get_running_loop()
+    except RuntimeError:
+        pass  # No running loop, this is expected
+    else:
         raise WorkflowError(
             "Error running coroutine in event loop. Snakemake currently does not "
             "support being executed from an already running event loop. "
             "If you run Snakemake e.g. from a Jupyter notebook, make sure to spawn a "
             "separate process for Snakemake.",
         )
What is the recommended way to check if an asyncio event loop is currently running in Python 3.10+?
src/snakemake/executors/local.py (1)

237-253: Add a comment explaining the intentional use of async_run() in thread-local executor.

The code correctly uses async_run() for thread-local execution (evident from commit "fix: use runner in thread-local executors"). Since asyncio event loops are thread-bound, using a thread-local runner via async_run() is architecturally sound and differs intentionally from executors like touch.py and dryrun.py that run in the main thread. Consider adding an inline comment at line 238 explaining this design choice to clarify the distinction for future reviewers.

📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 30e1509 and 401c869.

📒 Files selected for processing (6)
  • src/snakemake/common/__init__.py
  • src/snakemake/executors/dryrun.py
  • src/snakemake/executors/local.py
  • src/snakemake/executors/touch.py
  • src/snakemake/scheduling/job_scheduler.py
  • src/snakemake/workflow.py
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py

⚙️ CodeRabbit configuration file

**/*.py: Do not try to improve formatting.
Do not suggest type annotations for functions that are defined inside of functions or methods.
Do not suggest type annotation of the self argument of methods.
Do not suggest type annotation of the cls argument of classmethods.
Do not suggest return type annotation if a function or method does not contain a return statement.

Files:

  • src/snakemake/common/__init__.py
  • src/snakemake/executors/dryrun.py
  • src/snakemake/scheduling/job_scheduler.py
  • src/snakemake/workflow.py
  • src/snakemake/executors/touch.py
  • src/snakemake/executors/local.py
🧠 Learnings (5)
📚 Learning: 2026-01-08T11:13:20.523Z
Learnt from: johanneskoester
Repo: snakemake/snakemake PR: 3643
File: src/snakemake/sourcecache.py:474-492
Timestamp: 2026-01-08T11:13:20.523Z
Learning: In Python files under src/snakemake, ensure that the per-run cache_path for source files does not change within a single workflow run, so that HostingProviderFile._hosted_repos can be keyed reliably by repository name. This invariant should be documented and enforced (e.g., by freezing cache_path after initialization, avoiding mutations during a run, and ensuring any caching uses a run-scoped key). This pattern applies to all modules in the Snakemake source cache area where per-run caching and repository hosting are implemented.

Applied to files:

  • src/snakemake/common/__init__.py
  • src/snakemake/executors/dryrun.py
  • src/snakemake/scheduling/job_scheduler.py
  • src/snakemake/workflow.py
  • src/snakemake/executors/touch.py
  • src/snakemake/executors/local.py
📚 Learning: 2024-10-04T16:12:18.927Z
Learnt from: lczech
Repo: snakemake/snakemake PR: 3113
File: snakemake/scheduler.py:912-914
Timestamp: 2024-10-04T16:12:18.927Z
Learning: In `snakemake/scheduler.py`, avoid suggesting the use of `asyncio.gather` in the `jobs_rewards` method due to overhead concerns and the need for immediate results.

Applied to files:

  • src/snakemake/scheduling/job_scheduler.py
  • src/snakemake/executors/touch.py
  • src/snakemake/executors/local.py
📚 Learning: 2024-11-05T09:41:56.469Z
Learnt from: johanneskoester
Repo: snakemake/snakemake PR: 3184
File: snakemake/dag.py:1340-1342
Timestamp: 2024-11-05T09:41:56.469Z
Learning: The method `self.workflow.persistence.software_stack_changed(job)` in `snakemake/persistence.py` is not asynchronous.

Applied to files:

  • src/snakemake/scheduling/job_scheduler.py
  • src/snakemake/executors/touch.py
📚 Learning: 2025-07-29T14:53:04.598Z
Learnt from: johanneskoester
Repo: snakemake/snakemake PR: 3676
File: src/snakemake/cli.py:0-0
Timestamp: 2025-07-29T14:53:04.598Z
Learning: In the Snakemake repository, deprecated scheduler interfaces (like scheduler_ilp_solver, --scheduler-solver-path, --scheduler-greediness) should be kept for backward compatibility during the migration to the plugin-based scheduler system, rather than being removed immediately.

Applied to files:

  • src/snakemake/scheduling/job_scheduler.py
📚 Learning: 2024-10-11T13:12:35.827Z
Learnt from: johanneskoester
Repo: snakemake/snakemake PR: 3132
File: snakemake/deployment/conda.py:85-88
Timestamp: 2024-10-11T13:12:35.827Z
Learning: In the `snakemake/deployment/conda.py` file, within the `Env` class, validation of `env_dir` occurs later in the code, so it's unnecessary to validate `env_dir` in the `__init__` method.

Applied to files:

  • src/snakemake/workflow.py
🧬 Code graph analysis (6)
src/snakemake/common/__init__.py (2)
src/snakemake/scheduling/job_scheduler.py (1)
  • run (504-508)
src/snakemake/workflow.py (1)
  • run (2367-2368)
src/snakemake/executors/dryrun.py (2)
src/snakemake/scheduling/job_scheduler.py (1)
  • run (504-508)
src/snakemake/workflow.py (1)
  • output_file_cache (532-533)
src/snakemake/scheduling/job_scheduler.py (1)
src/snakemake/workflow.py (1)
  • run (2367-2368)
src/snakemake/workflow.py (1)
src/snakemake/common/__init__.py (1)
  • async_runner (96-114)
src/snakemake/executors/touch.py (1)
src/snakemake/scheduling/job_scheduler.py (1)
  • run (504-508)
src/snakemake/executors/local.py (3)
src/snakemake/common/__init__.py (1)
  • async_run (117-120)
src/snakemake/workflow.py (3)
  • get_cache_mode (607-611)
  • rule (1813-2081)
  • output_file_cache (532-533)
src/snakemake/exceptions.py (1)
  • CacheMissException (584-585)
🪛 Ruff (0.14.10)
src/snakemake/common/__init__.py

107-112: Avoid specifying long messages outside the exception class

(TRY003)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (34)
  • GitHub Check: apidocs
  • GitHub Check: tests (10, macos-latest, py313)
  • GitHub Check: tests (10, ubuntu-latest, py312)
  • GitHub Check: tests (10, windows-2022, py313)
  • GitHub Check: tests (9, ubuntu-latest, py311)
  • GitHub Check: tests (10, ubuntu-latest, py311)
  • GitHub Check: tests (8, macos-latest, py313)
  • GitHub Check: tests (2, ubuntu-latest, py311)
  • GitHub Check: tests (6, ubuntu-latest, py311)
  • GitHub Check: tests (9, macos-latest, py313)
  • GitHub Check: tests (9, windows-2022, py313)
  • GitHub Check: tests (10, ubuntu-latest, py313)
  • GitHub Check: tests (8, ubuntu-latest, py311)
  • GitHub Check: tests (9, ubuntu-latest, py313)
  • GitHub Check: tests (4, windows-2022, py313)
  • GitHub Check: tests (8, windows-2022, py313)
  • GitHub Check: tests (3, ubuntu-latest, py312)
  • GitHub Check: tests (6, windows-2022, py313)
  • GitHub Check: tests (7, ubuntu-latest, py313)
  • GitHub Check: tests (8, ubuntu-latest, py312)
  • GitHub Check: tests (5, ubuntu-latest, py311)
  • GitHub Check: tests (1, ubuntu-latest, py313)
  • GitHub Check: tests (5, windows-2022, py313)
  • GitHub Check: tests (7, ubuntu-latest, py312)
  • GitHub Check: tests (7, windows-2022, py313)
  • GitHub Check: tests (5, ubuntu-latest, py313)
  • GitHub Check: tests (4, ubuntu-latest, py313)
  • GitHub Check: tests (6, macos-latest, py313)
  • GitHub Check: tests (3, windows-2022, py313)
  • GitHub Check: tests (2, macos-latest, py313)
  • GitHub Check: tests (3, ubuntu-latest, py311)
  • GitHub Check: tests (1, ubuntu-latest, py311)
  • GitHub Check: tests (2, windows-2022, py313)
  • GitHub Check: tests (1, windows-2022, py313)
🔇 Additional comments (10)
src/snakemake/common/__init__.py (1)

117-120: LGTM!

The async_run helper correctly uses async_runner() as a context manager and delegates to the runner. This maintains backward compatibility for callers that don't need to reuse the runner.

src/snakemake/executors/touch.py (1)

70-70: LGTM!

The change correctly routes the async touch operation through the centralized workflow runner, which aligns with the PR objective of reusing the async runner to prevent port exhaustion.

src/snakemake/executors/dryrun.py (1)

50-52: LGTM!

The change correctly routes the cache existence check through the centralized workflow runner.

src/snakemake/workflow.py (3)

274-274: LGTM!

The close() call properly cleans up the async runner. This is the correct method to use for asyncio.Runner cleanup.


321-321: LGTM!

All async operation call sites have been consistently updated to use self._async_runner.run(...) instead of async_run(...). This centralizes async execution through the single workflow-managed runner instance.

Also applies to: 341-341, 974-974, 986-986, 1015-1015, 1107-1111, 1224-1225, 1260-1265, 1284-1286, 1459-1461


2176-2176: Unrelated change in diff.

Line 2176 shows a change marker (~) but the import from conda_inject import PackageManager, inject_env_file appears to be unrelated to the async runner refactoring. Please verify this change was intentional.

src/snakemake/executors/local.py (1)

240-253: New async method structure looks correct.

The acached_or_run async method properly:

  • Awaits the cache fetch operation (line 247)
  • Returns early on cache hit (line 248)
  • Calls run_func synchronously (line 251) - correct since run_func is not async
  • Awaits the cache store operation (line 253)
src/snakemake/scheduling/job_scheduler.py (3)

350-354: LGTM!

Correctly uses the centralized async runner for retrieving storage inputs for local jobs.


486-486: LGTM!

All these call sites correctly use self.workflow._async_runner.run() for:

  • Postprocessing finished jobs (line 486)
  • Updating queue input jobs (line 496)
  • Postprocessing errored jobs (line 552)
  • Updating input sizes (line 576)

Also applies to: 496-496, 552-552, 576-576


599-601: Minor formatting change.

The assertion message reformatting from multi-line to parenthesized single statement is a minor style adjustment and doesn't affect functionality.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/snakemake/scheduling/job_scheduler.py (1)

374-378: Critical: Inconsistent async runner usage.

This async_run() call was not replaced with self.workflow._async_runner.run(), creating an inconsistency with the other changes in this file. This location will still create new async runners, potentially contributing to the port exhaustion issue the PR is meant to fix.

🔧 Proposed fix
-                            async_run(
+                            self.workflow._async_runner.run(
                                 self.workflow.dag.retrieve_storage_inputs(
                                     jobs=runjobs, also_missing_internal=True
                                 )
                             )
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 401c869 and 01a80b0.

📒 Files selected for processing (1)
  • src/snakemake/scheduling/job_scheduler.py
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py

⚙️ CodeRabbit configuration file

**/*.py: Do not try to improve formatting.
Do not suggest type annotations for functions that are defined inside of functions or methods.
Do not suggest type annotation of the self argument of methods.
Do not suggest type annotation of the cls argument of classmethods.
Do not suggest return type annotation if a function or method does not contain a return statement.

Files:

  • src/snakemake/scheduling/job_scheduler.py
🧠 Learnings (4)
📚 Learning: 2024-10-04T16:12:18.927Z
Learnt from: lczech
Repo: snakemake/snakemake PR: 3113
File: snakemake/scheduler.py:912-914
Timestamp: 2024-10-04T16:12:18.927Z
Learning: In `snakemake/scheduler.py`, avoid suggesting the use of `asyncio.gather` in the `jobs_rewards` method due to overhead concerns and the need for immediate results.

Applied to files:

  • src/snakemake/scheduling/job_scheduler.py
📚 Learning: 2024-11-05T09:41:56.469Z
Learnt from: johanneskoester
Repo: snakemake/snakemake PR: 3184
File: snakemake/dag.py:1340-1342
Timestamp: 2024-11-05T09:41:56.469Z
Learning: The method `self.workflow.persistence.software_stack_changed(job)` in `snakemake/persistence.py` is not asynchronous.

Applied to files:

  • src/snakemake/scheduling/job_scheduler.py
📚 Learning: 2025-07-29T14:53:04.598Z
Learnt from: johanneskoester
Repo: snakemake/snakemake PR: 3676
File: src/snakemake/cli.py:0-0
Timestamp: 2025-07-29T14:53:04.598Z
Learning: In the Snakemake repository, deprecated scheduler interfaces (like scheduler_ilp_solver, --scheduler-solver-path, --scheduler-greediness) should be kept for backward compatibility during the migration to the plugin-based scheduler system, rather than being removed immediately.

Applied to files:

  • src/snakemake/scheduling/job_scheduler.py
📚 Learning: 2026-01-08T11:13:20.523Z
Learnt from: johanneskoester
Repo: snakemake/snakemake PR: 3643
File: src/snakemake/sourcecache.py:474-492
Timestamp: 2026-01-08T11:13:20.523Z
Learning: In Python files under src/snakemake, ensure that the per-run cache_path for source files does not change within a single workflow run, so that HostingProviderFile._hosted_repos can be keyed reliably by repository name. This invariant should be documented and enforced (e.g., by freezing cache_path after initialization, avoiding mutations during a run, and ensuring any caching uses a run-scoped key). This pattern applies to all modules in the Snakemake source cache area where per-run caching and repository hosting are implemented.

Applied to files:

  • src/snakemake/scheduling/job_scheduler.py
🧬 Code graph analysis (1)
src/snakemake/scheduling/job_scheduler.py (2)
src/snakemake/workflow.py (2)
  • run (2367-2368)
  • dag (524-525)
src/snakemake/dag.py (3)
  • postprocess (1903-1949)
  • update_queue_input_jobs (2123-2142)
  • jobs (616-618)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (46)
  • GitHub Check: tests (8, ubuntu-latest, py312)
  • GitHub Check: tests (10, windows-2022, py313)
  • GitHub Check: tests (9, windows-2022, py313)
  • GitHub Check: tests (6, ubuntu-latest, py313)
  • GitHub Check: tests (10, ubuntu-latest, py313)
  • GitHub Check: tests (7, ubuntu-latest, py311)
  • GitHub Check: tests (9, ubuntu-latest, py313)
  • GitHub Check: tests (4, ubuntu-latest, py312)
  • GitHub Check: tests (4, ubuntu-latest, py311)
  • GitHub Check: tests (10, ubuntu-latest, py312)
  • GitHub Check: tests (9, ubuntu-latest, py312)
  • GitHub Check: tests (4, ubuntu-latest, py313)
  • GitHub Check: tests (9, ubuntu-latest, py311)
  • GitHub Check: tests (7, macos-latest, py313)
  • GitHub Check: tests (6, windows-2022, py313)
  • GitHub Check: tests (5, macos-latest, py313)
  • GitHub Check: tests (7, windows-2022, py313)
  • GitHub Check: tests (10, ubuntu-latest, py311)
  • GitHub Check: tests (5, ubuntu-latest, py313)
  • GitHub Check: tests (3, ubuntu-latest, py313)
  • GitHub Check: tests (5, windows-2022, py313)
  • GitHub Check: tests (5, ubuntu-latest, py311)
  • GitHub Check: tests (5, ubuntu-latest, py312)
  • GitHub Check: tests (8, ubuntu-latest, py313)
  • GitHub Check: tests (7, ubuntu-latest, py312)
  • GitHub Check: tests (3, windows-2022, py313)
  • GitHub Check: tests (6, ubuntu-latest, py311)
  • GitHub Check: tests (8, windows-2022, py313)
  • GitHub Check: tests (8, ubuntu-latest, py311)
  • GitHub Check: tests (4, macos-latest, py313)
  • GitHub Check: tests (7, ubuntu-latest, py313)
  • GitHub Check: tests (6, macos-latest, py313)
  • GitHub Check: tests (4, windows-2022, py313)
  • GitHub Check: tests (1, ubuntu-latest, py311)
  • GitHub Check: tests (6, ubuntu-latest, py312)
  • GitHub Check: tests (3, ubuntu-latest, py311)
  • GitHub Check: tests (2, ubuntu-latest, py313)
  • GitHub Check: tests (1, windows-2022, py313)
  • GitHub Check: tests (2, macos-latest, py313)
  • GitHub Check: tests (2, ubuntu-latest, py311)
  • GitHub Check: tests (2, ubuntu-latest, py312)
  • GitHub Check: tests (3, ubuntu-latest, py312)
  • GitHub Check: tests (1, ubuntu-latest, py313)
  • GitHub Check: tests (2, windows-2022, py313)
  • GitHub Check: tests (1, ubuntu-latest, py312)
  • GitHub Check: apidocs
🔇 Additional comments (5)
src/snakemake/scheduling/job_scheduler.py (5)

350-354: LGTM: Async runner reuse implemented correctly.

The change from async_run() to self.workflow._async_runner.run() correctly implements the centralized async runner pattern to prevent port exhaustion.


486-486: LGTM: Postprocess execution properly centralized.

Correctly routes postprocessing through the centralized async runner.


496-496: LGTM: Queue input job updates properly centralized.

Correctly uses the centralized async runner for updating queue input jobs.


552-552: LGTM: Error postprocessing properly centralized.

Correctly routes error postprocessing through the centralized async runner.


576-576: LGTM: Input size updates properly centralized.

The change correctly uses the centralized async runner. The _async_runner initialization in Workflow.__init__ (line 223) occurs before JobScheduler instantiation (line 1346), so the attribute will always be available when needed.

@coroa
Copy link
Copy Markdown
Contributor Author

coroa commented Jan 8, 2026

@coderabbitai review

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Jan 8, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

@coroa
Copy link
Copy Markdown
Contributor Author

coroa commented Jan 8, 2026

@coderabbitai review

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Jan 8, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Copy Markdown
Contributor

@johanneskoester johanneskoester left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this a lot! Just two requests below.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
src/snakemake/workflow.py (1)

15-16: Per-thread async runner implementation is sound; consider clearing the runner map on teardown

  • Initializing _async_runners and _async_lock in __post_init__ and implementing async_run(self, coro) to pick/create a per-thread asyncio.Runner via async_runner().__enter__() is a reasonable way to reuse event loops per thread and avoid repeated loop creation.
  • tear_down() now closes all stored runners under the lock, which correctly releases loop resources at the end of a workflow run.

Minor robustness improvement: after closing, you could clear the mapping to avoid holding references to closed runners in long-lived processes and to make any accidental post–tear-down async_run calls recreate a fresh runner.

Example:

     def tear_down(self):
@@
-        with self._async_lock:
-            for runner in self._async_runners.values():
-                runner.close()
+        with self._async_lock:
+            for runner in self._async_runners.values():
+                runner.close()
+            self._async_runners.clear()

This is not required for correctness in the common CLI usage but would help API callers who might reuse a Workflow instance.

Also applies to: 116-127, 175-255, 279-288

📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4764838 and 257aeb8.

📒 Files selected for processing (8)
  • src/snakemake/executors/dryrun.py
  • src/snakemake/executors/local.py
  • src/snakemake/executors/touch.py
  • src/snakemake/io/__init__.py
  • src/snakemake/resources.py
  • src/snakemake/rules.py
  • src/snakemake/scheduling/job_scheduler.py
  • src/snakemake/workflow.py
🚧 Files skipped from review as they are similar to previous changes (2)
  • src/snakemake/executors/dryrun.py
  • src/snakemake/executors/touch.py
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py

⚙️ CodeRabbit configuration file

**/*.py: Do not try to improve formatting.
Do not suggest type annotations for functions that are defined inside of functions or methods.
Do not suggest type annotation of the self argument of methods.
Do not suggest type annotation of the cls argument of classmethods.
Do not suggest return type annotation if a function or method does not contain a return statement.

Files:

  • src/snakemake/io/__init__.py
  • src/snakemake/scheduling/job_scheduler.py
  • src/snakemake/workflow.py
  • src/snakemake/resources.py
  • src/snakemake/rules.py
  • src/snakemake/executors/local.py
🧠 Learnings (5)
📚 Learning: 2026-01-08T11:13:20.523Z
Learnt from: johanneskoester
Repo: snakemake/snakemake PR: 3643
File: src/snakemake/sourcecache.py:474-492
Timestamp: 2026-01-08T11:13:20.523Z
Learning: In Python files under src/snakemake, ensure that the per-run cache_path for source files does not change within a single workflow run, so that HostingProviderFile._hosted_repos can be keyed reliably by repository name. This invariant should be documented and enforced (e.g., by freezing cache_path after initialization, avoiding mutations during a run, and ensuring any caching uses a run-scoped key). This pattern applies to all modules in the Snakemake source cache area where per-run caching and repository hosting are implemented.

Applied to files:

  • src/snakemake/io/__init__.py
  • src/snakemake/scheduling/job_scheduler.py
  • src/snakemake/workflow.py
  • src/snakemake/resources.py
  • src/snakemake/rules.py
  • src/snakemake/executors/local.py
📚 Learning: 2024-10-04T16:12:18.927Z
Learnt from: lczech
Repo: snakemake/snakemake PR: 3113
File: snakemake/scheduler.py:912-914
Timestamp: 2024-10-04T16:12:18.927Z
Learning: In `snakemake/scheduler.py`, avoid suggesting the use of `asyncio.gather` in the `jobs_rewards` method due to overhead concerns and the need for immediate results.

Applied to files:

  • src/snakemake/scheduling/job_scheduler.py
  • src/snakemake/workflow.py
  • src/snakemake/executors/local.py
📚 Learning: 2025-07-29T14:53:04.598Z
Learnt from: johanneskoester
Repo: snakemake/snakemake PR: 3676
File: src/snakemake/cli.py:0-0
Timestamp: 2025-07-29T14:53:04.598Z
Learning: In the Snakemake repository, deprecated scheduler interfaces (like scheduler_ilp_solver, --scheduler-solver-path, --scheduler-greediness) should be kept for backward compatibility during the migration to the plugin-based scheduler system, rather than being removed immediately.

Applied to files:

  • src/snakemake/scheduling/job_scheduler.py
📚 Learning: 2024-11-05T09:41:56.469Z
Learnt from: johanneskoester
Repo: snakemake/snakemake PR: 3184
File: snakemake/dag.py:1340-1342
Timestamp: 2024-11-05T09:41:56.469Z
Learning: The method `self.workflow.persistence.software_stack_changed(job)` in `snakemake/persistence.py` is not asynchronous.

Applied to files:

  • src/snakemake/scheduling/job_scheduler.py
  • src/snakemake/workflow.py
  • src/snakemake/executors/local.py
📚 Learning: 2024-10-11T13:12:35.827Z
Learnt from: johanneskoester
Repo: snakemake/snakemake PR: 3132
File: snakemake/deployment/conda.py:85-88
Timestamp: 2024-10-11T13:12:35.827Z
Learning: In the `snakemake/deployment/conda.py` file, within the `Env` class, validation of `env_dir` occurs later in the code, so it's unnecessary to validate `env_dir` in the `__init__` method.

Applied to files:

  • src/snakemake/workflow.py
🧬 Code graph analysis (4)
src/snakemake/io/__init__.py (2)
src/snakemake/workflow.py (1)
  • async_run (255-262)
src/snakemake/common/__init__.py (1)
  • async_run (117-120)
src/snakemake/scheduling/job_scheduler.py (2)
src/snakemake/workflow.py (3)
  • async_run (255-262)
  • dag (537-538)
  • run (2380-2381)
src/snakemake/common/__init__.py (1)
  • async_run (117-120)
src/snakemake/workflow.py (1)
src/snakemake/common/__init__.py (2)
  • async_runner (96-114)
  • async_run (117-120)
src/snakemake/executors/local.py (2)
src/snakemake/workflow.py (4)
  • async_run (255-262)
  • get_cache_mode (620-624)
  • rule (1826-2094)
  • output_file_cache (545-546)
src/snakemake/exceptions.py (1)
  • CacheMissException (584-585)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (42)
  • GitHub Check: apidocs
  • GitHub Check: tests (9, ubuntu-latest, py313)
  • GitHub Check: tests (10, ubuntu-latest, py311)
  • GitHub Check: tests (10, ubuntu-latest, py312)
  • GitHub Check: tests (10, ubuntu-latest, py313)
  • GitHub Check: tests (4, ubuntu-latest, py312)
  • GitHub Check: tests (10, windows-2022, py313)
  • GitHub Check: tests (8, ubuntu-latest, py313)
  • GitHub Check: tests (7, ubuntu-latest, py312)
  • GitHub Check: tests (9, ubuntu-latest, py312)
  • GitHub Check: tests (7, windows-2022, py313)
  • GitHub Check: tests (9, ubuntu-latest, py311)
  • GitHub Check: tests (6, windows-2022, py313)
  • GitHub Check: tests (8, windows-2022, py313)
  • GitHub Check: tests (7, ubuntu-latest, py313)
  • GitHub Check: tests (3, ubuntu-latest, py313)
  • GitHub Check: tests (9, windows-2022, py313)
  • GitHub Check: tests (5, ubuntu-latest, py313)
  • GitHub Check: tests (5, ubuntu-latest, py312)
  • GitHub Check: tests (8, ubuntu-latest, py312)
  • GitHub Check: tests (6, ubuntu-latest, py311)
  • GitHub Check: tests (7, macos-latest, py313)
  • GitHub Check: tests (4, macos-latest, py313)
  • GitHub Check: tests (5, ubuntu-latest, py311)
  • GitHub Check: tests (5, windows-2022, py313)
  • GitHub Check: tests (7, ubuntu-latest, py311)
  • GitHub Check: tests (6, ubuntu-latest, py313)
  • GitHub Check: tests (8, ubuntu-latest, py311)
  • GitHub Check: tests (6, ubuntu-latest, py312)
  • GitHub Check: tests (4, windows-2022, py313)
  • GitHub Check: tests (2, ubuntu-latest, py313)
  • GitHub Check: tests (3, windows-2022, py313)
  • GitHub Check: tests (4, ubuntu-latest, py311)
  • GitHub Check: tests (4, ubuntu-latest, py313)
  • GitHub Check: tests (1, windows-2022, py313)
  • GitHub Check: tests (2, ubuntu-latest, py311)
  • GitHub Check: tests (2, macos-latest, py313)
  • GitHub Check: tests (3, ubuntu-latest, py312)
  • GitHub Check: tests (2, windows-2022, py313)
  • GitHub Check: tests (3, ubuntu-latest, py311)
  • GitHub Check: tests (3, macos-latest, py313)
  • GitHub Check: tests (2, ubuntu-latest, py312)
🔇 Additional comments (10)
src/snakemake/resources.py (2)

256-260: LGTM: Dictionary comprehension fix.

The removal of the trailing comma in the tuple unpacking is correct. The .items() method returns 2-tuples, so (k, res) is the proper unpacking syntax.


526-600: Implementation correctly propagates async_run through the resource expression evaluation chain.

The parameter passing is sound: expand_resources passes async_run=self.workflow.async_run via the aux dict, and get_input_function_aux_params preserves it based on the callable's explicit signature. Since both resource callables (lines 575, 589) explicitly list async_run as a parameter and forward it to generic_callable, the access to kwargs["async_run"] at line 535 is always safe—there is no KeyError risk. All invocation sites already pass the required parameter through the expand_resources path.

src/snakemake/rules.py (2)

1071-1075: Benchmark wildcard error message change is fine

The message wording/formatting change preserves semantics and won’t affect behavior. No further action needed.


1091-1106: async_run propagation into resource callables looks correct

Passing async_run=self.workflow.async_run via aux is consistent with the existing aux-parameter plumbing and enables resource functions that opt in via an async_run parameter to reuse the workflow’s async runner. Callables that don’t declare async_run remain unaffected.

Please double-check that any resource functions intended to use async execution explicitly declare an async_run parameter in their signature so they actually receive this runner.

src/snakemake/io/__init__.py (2)

459-465: Clarified error for callable IOFile.file access

Raising a clear ValueError when accessing .file on a callable-backed IOFile is appropriate and makes the misuse explicit without changing behavior for non-callable cases.


50-55: Async runner indirection via globals() is reasonable but needs disciplined injection

Using async_run imported as async_run_fallback and then resolving via

globals().get("async_run", async_run_fallback)(sizes())

allows callers to inject a workflow-specific async runner while preserving the previous per-call runner as a fallback. This aligns with the PR goal of reusing the async runner where possible.

The correctness now hinges on whoever evaluates resource expressions setting and restoring snakemake.io.async_run in a thread-safe manner (e.g. per-workflow, per-eval, with proper cleanup) so that concurrent workflows or tests don’t leak runners into each other.

Please verify that the code paths evaluating resource expressions (and any others that rely on InputFiles.size_*) explicitly set snakemake.io.async_run = workflow.async_run for the duration of the evaluation and restore the previous value afterwards, and that this is safe under the expected concurrency model.

Also applies to: 1927-1938

src/snakemake/executors/local.py (1)

112-174: Local executor’s use of workflow.async_run and async cache path looks correct

  • Running job.prepare() via self.workflow.async_run(job.prepare()) correctly routes preparation through the per-workflow async runner.
  • Splitting cache handling into async acached_or_run and wrapping it with self.workflow.async_run(...) in cached_or_run preserves synchronous executor semantics while allowing async fetch/store to reuse the same runner.
  • Exception handling for CacheMissException on fetch and unconditional await on store remain consistent with typical cache behavior.

Also applies to: 236-253

src/snakemake/scheduling/job_scheduler.py (1)

337-379: Migration to workflow.async_run in scheduler hot paths looks good

Switching the various async operations in schedule() and _finish_jobs()/update_queue_input_jobs() to self.workflow.async_run(...) (retrieving storage inputs, job postprocess, queue updates) is consistent with the new per-workflow async runner and preserves the previous control flow.

Also applies to: 416-487, 490-497

src/snakemake/workflow.py (2)

328-355: Call sites migrated to workflow.async_run are consistent

All updated call sites that now use self.async_run(...) (source archive cleanup/upload, DAG clean, change listing, summary join, DAG init/update, wait_for_files, postprocess, and storage output/cleanup) correctly pass coroutine objects and rely on the new per-workflow async runner. This keeps previous behavior while enabling runner reuse.

Also applies to: 987-1002, 1023-1029, 1112-1124, 1235-1239, 1271-1278, 1296-1299, 1471-1475


2187-2225: Code changes look fine; conda-inject is already declared as a required dependency

The conda-inject package is properly declared in pyproject.toml as a required dependency with version constraints ("conda-inject>=1.3.1,<2.0"). No additional action needed.

Likely an incorrect or invalid review comment.

@coroa
Copy link
Copy Markdown
Contributor Author

coroa commented Jan 8, 2026

Ok, i now understood your async_run workflow method suggestion, with this it was also possible to make it thread-safe, and after some poking i understood also how to hand the method into the eval_resource_expression.

In my mind this fixes everything that was to dislike with this approach, we now hold an async_runner per thread and close down all event loops in the workflow teardown.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
src/snakemake/rules.py (1)

708-711: Consider clarifying the comment.

The logic is correct: async_run is a bound method reference (e.g., self.workflow.async_run) that should be passed directly to the function, not evaluated like the other callables in aux_params. The current comment is a bit cryptic about why this distinction matters.

📝 Suggested comment improvement
-        # async_run needs to be passed as a method and therefore is only added after
-        # evaluating the others
+        # async_run is a bound method reference that should be passed as-is,
+        # not evaluated like other callables in aux_params, so it's added after evaluation
         if async_run is not None and "async_run" in get_function_params(func):
             _aux_params["async_run"] = async_run
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 257aeb8 and a4798c5.

📒 Files selected for processing (2)
  • src/snakemake/rules.py
  • src/snakemake/scheduling/job_scheduler.py
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py

⚙️ CodeRabbit configuration file

**/*.py: Do not try to improve formatting.
Do not suggest type annotations for functions that are defined inside of functions or methods.
Do not suggest type annotation of the self argument of methods.
Do not suggest type annotation of the cls argument of classmethods.
Do not suggest return type annotation if a function or method does not contain a return statement.

Files:

  • src/snakemake/scheduling/job_scheduler.py
  • src/snakemake/rules.py
🧠 Learnings (4)
📚 Learning: 2024-10-04T16:12:18.927Z
Learnt from: lczech
Repo: snakemake/snakemake PR: 3113
File: snakemake/scheduler.py:912-914
Timestamp: 2024-10-04T16:12:18.927Z
Learning: In `snakemake/scheduler.py`, avoid suggesting the use of `asyncio.gather` in the `jobs_rewards` method due to overhead concerns and the need for immediate results.

Applied to files:

  • src/snakemake/scheduling/job_scheduler.py
📚 Learning: 2025-07-29T14:53:04.598Z
Learnt from: johanneskoester
Repo: snakemake/snakemake PR: 3676
File: src/snakemake/cli.py:0-0
Timestamp: 2025-07-29T14:53:04.598Z
Learning: In the Snakemake repository, deprecated scheduler interfaces (like scheduler_ilp_solver, --scheduler-solver-path, --scheduler-greediness) should be kept for backward compatibility during the migration to the plugin-based scheduler system, rather than being removed immediately.

Applied to files:

  • src/snakemake/scheduling/job_scheduler.py
📚 Learning: 2024-11-05T09:41:56.469Z
Learnt from: johanneskoester
Repo: snakemake/snakemake PR: 3184
File: snakemake/dag.py:1340-1342
Timestamp: 2024-11-05T09:41:56.469Z
Learning: The method `self.workflow.persistence.software_stack_changed(job)` in `snakemake/persistence.py` is not asynchronous.

Applied to files:

  • src/snakemake/scheduling/job_scheduler.py
📚 Learning: 2026-01-08T11:13:20.523Z
Learnt from: johanneskoester
Repo: snakemake/snakemake PR: 3643
File: src/snakemake/sourcecache.py:474-492
Timestamp: 2026-01-08T11:13:20.523Z
Learning: In Python files under src/snakemake, ensure that the per-run cache_path for source files does not change within a single workflow run, so that HostingProviderFile._hosted_repos can be keyed reliably by repository name. This invariant should be documented and enforced (e.g., by freezing cache_path after initialization, avoiding mutations during a run, and ensuring any caching uses a run-scoped key). This pattern applies to all modules in the Snakemake source cache area where per-run caching and repository hosting are implemented.

Applied to files:

  • src/snakemake/scheduling/job_scheduler.py
  • src/snakemake/rules.py
🧬 Code graph analysis (2)
src/snakemake/scheduling/job_scheduler.py (3)
src/snakemake/common/__init__.py (1)
  • async_run (117-120)
src/snakemake/workflow.py (2)
  • async_run (255-262)
  • dag (537-538)
src/snakemake/dag.py (3)
  • postprocess (1903-1949)
  • update_queue_input_jobs (2123-2142)
  • jobs (616-618)
src/snakemake/rules.py (2)
src/snakemake/workflow.py (2)
  • async_run (255-262)
  • func (1706-1712)
src/snakemake/common/__init__.py (1)
  • get_function_params (303-307)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (45)
  • GitHub Check: tests (9, ubuntu-latest, py313)
  • GitHub Check: tests (6, windows-2022, py313)
  • GitHub Check: tests (9, ubuntu-latest, py311)
  • GitHub Check: tests (10, ubuntu-latest, py312)
  • GitHub Check: tests (8, windows-2022, py313)
  • GitHub Check: tests (2, windows-2022, py313)
  • GitHub Check: tests (6, macos-latest, py313)
  • GitHub Check: tests (8, ubuntu-latest, py312)
  • GitHub Check: tests (10, ubuntu-latest, py311)
  • GitHub Check: tests (6, ubuntu-latest, py311)
  • GitHub Check: tests (10, windows-2022, py313)
  • GitHub Check: tests (6, ubuntu-latest, py313)
  • GitHub Check: tests (8, ubuntu-latest, py313)
  • GitHub Check: tests (7, ubuntu-latest, py311)
  • GitHub Check: tests (7, ubuntu-latest, py313)
  • GitHub Check: tests (9, ubuntu-latest, py312)
  • GitHub Check: tests (7, macos-latest, py313)
  • GitHub Check: tests (7, ubuntu-latest, py312)
  • GitHub Check: tests (10, ubuntu-latest, py313)
  • GitHub Check: tests (8, ubuntu-latest, py311)
  • GitHub Check: tests (9, windows-2022, py313)
  • GitHub Check: tests (7, windows-2022, py313)
  • GitHub Check: tests (6, ubuntu-latest, py312)
  • GitHub Check: tests (5, ubuntu-latest, py311)
  • GitHub Check: tests (4, ubuntu-latest, py311)
  • GitHub Check: tests (5, ubuntu-latest, py312)
  • GitHub Check: tests (5, windows-2022, py313)
  • GitHub Check: tests (1, ubuntu-latest, py311)
  • GitHub Check: tests (1, ubuntu-latest, py313)
  • GitHub Check: tests (4, ubuntu-latest, py312)
  • GitHub Check: tests (2, ubuntu-latest, py311)
  • GitHub Check: tests (2, ubuntu-latest, py313)
  • GitHub Check: tests (4, ubuntu-latest, py313)
  • GitHub Check: tests (3, ubuntu-latest, py313)
  • GitHub Check: tests (5, ubuntu-latest, py313)
  • GitHub Check: tests (3, windows-2022, py313)
  • GitHub Check: tests (3, ubuntu-latest, py311)
  • GitHub Check: tests (1, windows-2022, py313)
  • GitHub Check: tests (1, ubuntu-latest, py312)
  • GitHub Check: tests (4, windows-2022, py313)
  • GitHub Check: tests (1, macos-latest, py313)
  • GitHub Check: tests (4, macos-latest, py313)
  • GitHub Check: tests (3, ubuntu-latest, py312)
  • GitHub Check: tests (5, macos-latest, py313)
  • GitHub Check: tests (2, ubuntu-latest, py312)
🔇 Additional comments (4)
src/snakemake/rules.py (3)

683-683: LGTM: New async_run parameter enables async runner propagation.

The addition of the async_run parameter allows the workflow's async runner to be threaded through to input functions and resource callables, supporting the PR's goal of reusing async runners to prevent port exhaustion.


1078-1078: LGTM: Error message simplification.

The single-line error message is clearer and more concise.


1100-1100: LGTM: Propagating async_run to resource callables.

Correctly passes the workflow's async runner to resource callables, enabling them to execute async operations via the centralized per-workflow runner. This aligns with the PR's goal of reusing async runners.

src/snakemake/scheduling/job_scheduler.py (1)

350-354: LGTM: Consistent migration to workflow's async runner.

All async dispatch points in the job scheduler have been correctly updated to use self.workflow.async_run(...) instead of the external async_run(...) helper. This ensures that:

  1. All asynchronous operations (storage retrieval, postprocessing, queue updates, input size updates) go through the per-workflow runner
  2. The per-thread async runners are reused, preventing port exhaustion
  3. Thread safety is maintained via the workflow's internal locking mechanism

The changes are consistent and align with the PR's objective of centralizing async execution through the workflow object.

Also applies to: 374-378, 486-486, 496-496, 552-552, 576-576

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In @src/snakemake/workflow.py:
- Around line 287-289: The tear_down method closes all runners but never shuts
down the shared ThreadPoolExecutor (_async_executor), causing lingering threads;
update tear_down to, after closing _async_runners (while holding or immediately
after releasing _async_lock), check if self._async_executor is not None and call
self._async_executor.shutdown(wait=True) (or wait=False if non-blocking teardown
is desired), then set self._async_executor = None to avoid reuse; ensure this is
done safely (guard against multiple tear_down calls) and reference the existing
attributes _async_executor, _async_lock, and _async_runners in the change.
🧹 Nitpick comments (1)
src/snakemake/workflow.py (1)

257-264: Manual __enter__() call without corresponding __exit__() - acceptable but unconventional.

The pattern of calling __enter__() directly without a context manager is intentional here since runners need to persist across multiple async_run calls. The cleanup is handled in tear_down() via runner.close().

This works correctly but be aware that if an exception occurs during runner creation after __enter__(), the runner may not be properly cleaned up. Consider wrapping the creation in a try-except that calls close() on failure:

♻️ Optional defensive cleanup
 def async_run(self, coro):
     threadid = threading.get_ident()
     with self._async_lock:
         runner = self._async_runners.get(threadid)
         if runner is None:
-            runner = async_runner(executor=self._async_executor).__enter__()
-            self._async_runners[threadid] = runner
+            runner = async_runner(executor=self._async_executor)
+            try:
+                runner.__enter__()
+            except Exception:
+                runner.close()
+                raise
+            self._async_runners[threadid] = runner
     return runner.run(coro)
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a4798c5 and 1327e07.

📒 Files selected for processing (2)
  • src/snakemake/common/__init__.py
  • src/snakemake/workflow.py
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py

⚙️ CodeRabbit configuration file

**/*.py: Do not try to improve formatting.
Do not suggest type annotations for functions that are defined inside of functions or methods.
Do not suggest type annotation of the self argument of methods.
Do not suggest type annotation of the cls argument of classmethods.
Do not suggest return type annotation if a function or method does not contain a return statement.

Files:

  • src/snakemake/workflow.py
  • src/snakemake/common/__init__.py
🧠 Learnings (3)
📚 Learning: 2024-11-05T09:41:56.469Z
Learnt from: johanneskoester
Repo: snakemake/snakemake PR: 3184
File: snakemake/dag.py:1340-1342
Timestamp: 2024-11-05T09:41:56.469Z
Learning: The method `self.workflow.persistence.software_stack_changed(job)` in `snakemake/persistence.py` is not asynchronous.

Applied to files:

  • src/snakemake/workflow.py
📚 Learning: 2024-10-11T13:12:35.827Z
Learnt from: johanneskoester
Repo: snakemake/snakemake PR: 3132
File: snakemake/deployment/conda.py:85-88
Timestamp: 2024-10-11T13:12:35.827Z
Learning: In the `snakemake/deployment/conda.py` file, within the `Env` class, validation of `env_dir` occurs later in the code, so it's unnecessary to validate `env_dir` in the `__init__` method.

Applied to files:

  • src/snakemake/workflow.py
📚 Learning: 2026-01-08T11:13:20.523Z
Learnt from: johanneskoester
Repo: snakemake/snakemake PR: 3643
File: src/snakemake/sourcecache.py:474-492
Timestamp: 2026-01-08T11:13:20.523Z
Learning: In Python files under src/snakemake, ensure that the per-run cache_path for source files does not change within a single workflow run, so that HostingProviderFile._hosted_repos can be keyed reliably by repository name. This invariant should be documented and enforced (e.g., by freezing cache_path after initialization, avoiding mutations during a run, and ensuring any caching uses a run-scoped key). This pattern applies to all modules in the Snakemake source cache area where per-run caching and repository hosting are implemented.

Applied to files:

  • src/snakemake/workflow.py
  • src/snakemake/common/__init__.py
🧬 Code graph analysis (2)
src/snakemake/workflow.py (1)
src/snakemake/common/__init__.py (2)
  • async_runner (96-148)
  • async_run (151-175)
src/snakemake/common/__init__.py (2)
src/snakemake/workflow.py (2)
  • async_run (257-264)
  • run (2382-2383)
src/snakemake/scheduling/job_scheduler.py (1)
  • run (504-508)
🪛 Ruff (0.14.10)
src/snakemake/common/__init__.py

129-134: Avoid specifying long messages outside the exception class

(TRY003)


138-140: Avoid specifying long messages outside the exception class

(TRY003)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (46)
  • GitHub Check: tests (9, windows-2022, py313)
  • GitHub Check: tests (9, ubuntu-latest, py313)
  • GitHub Check: tests (9, ubuntu-latest, py311)
  • GitHub Check: tests (4, ubuntu-latest, py313)
  • GitHub Check: tests (6, windows-2022, py313)
  • GitHub Check: tests (10, ubuntu-latest, py311)
  • GitHub Check: tests (6, ubuntu-latest, py313)
  • GitHub Check: tests (9, macos-latest, py313)
  • GitHub Check: tests (3, ubuntu-latest, py311)
  • GitHub Check: tests (5, ubuntu-latest, py311)
  • GitHub Check: tests (10, windows-2022, py313)
  • GitHub Check: tests (1, ubuntu-latest, py313)
  • GitHub Check: tests (8, windows-2022, py313)
  • GitHub Check: tests (5, ubuntu-latest, py312)
  • GitHub Check: tests (8, macos-latest, py313)
  • GitHub Check: tests (9, ubuntu-latest, py312)
  • GitHub Check: tests (7, windows-2022, py313)
  • GitHub Check: tests (7, ubuntu-latest, py312)
  • GitHub Check: tests (5, macos-latest, py313)
  • GitHub Check: tests (4, windows-2022, py313)
  • GitHub Check: tests (10, ubuntu-latest, py312)
  • GitHub Check: tests (10, ubuntu-latest, py313)
  • GitHub Check: tests (7, ubuntu-latest, py313)
  • GitHub Check: tests (3, windows-2022, py313)
  • GitHub Check: tests (5, ubuntu-latest, py313)
  • GitHub Check: tests (6, ubuntu-latest, py311)
  • GitHub Check: tests (5, windows-2022, py313)
  • GitHub Check: tests (8, ubuntu-latest, py312)
  • GitHub Check: tests (3, ubuntu-latest, py312)
  • GitHub Check: tests (4, ubuntu-latest, py311)
  • GitHub Check: tests (6, ubuntu-latest, py312)
  • GitHub Check: tests (7, ubuntu-latest, py311)
  • GitHub Check: tests (8, ubuntu-latest, py313)
  • GitHub Check: tests (8, ubuntu-latest, py311)
  • GitHub Check: tests (2, macos-latest, py313)
  • GitHub Check: tests (2, ubuntu-latest, py311)
  • GitHub Check: tests (4, ubuntu-latest, py312)
  • GitHub Check: tests (2, ubuntu-latest, py313)
  • GitHub Check: tests (3, macos-latest, py313)
  • GitHub Check: tests (2, windows-2022, py313)
  • GitHub Check: tests (3, ubuntu-latest, py313)
  • GitHub Check: tests (2, ubuntu-latest, py312)
  • GitHub Check: tests (1, ubuntu-latest, py311)
  • GitHub Check: tests (1, ubuntu-latest, py312)
  • GitHub Check: tests (1, windows-2022, py313)
  • GitHub Check: apidocs
🔇 Additional comments (4)
src/snakemake/common/__init__.py (2)

96-148: LGTM - Well-structured async runner factory.

The implementation correctly:

  • Detects running event loops to prevent nested execution issues
  • Validates mutual exclusivity of executor and loop_factory parameters
  • Creates a custom loop factory when an executor is provided

Note: The use of private API asyncio.events._get_running_loop() (line 128) is intentional here to avoid exception-based control flow that asyncio.get_running_loop() would require.


151-175: LGTM - Clean convenience wrapper.

The context manager pattern ensures proper cleanup of the runner after coroutine execution.

src/snakemake/workflow.py (2)

225-227: LGTM - Per-thread async runner infrastructure.

The initialization correctly sets up:

  • A dictionary to hold per-thread runners
  • A shared ThreadPoolExecutor for all runners
  • A lock to protect runner creation

336-336: LGTM - Call sites correctly migrated to self.async_run().

All async operations are now routed through the workflow's per-thread runner infrastructure, which should prevent port exhaustion by reusing event loops and executors.

Also applies to: 356-356, 989-989, 1001-1001, 1030-1030, 1122-1126, 1239-1240, 1275-1280, 1299-1301, 1474-1476

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In @src/snakemake/workflow.py:
- Around line 287-293: The code is creating a new attribute
runner._default_executor instead of clearing the loop's default executor, so
runner.close() still shuts down the shared executor; replace the bogus
assignment with clearing the loop's executor reference: for each runner in
self._async_runners.values(), if runner has a _loop attribute and runner._loop
has a _default_executor attribute, set runner._loop._default_executor = None
before calling runner.close(), then call self._async_executor.shutdown(); this
ensures you clear the real executor reference on the loop rather than creating a
new attribute on the Runner.
🧹 Nitpick comments (1)
src/snakemake/workflow.py (1)

257-264: Thread-safety design is sound, but the context manager usage is unconventional.

The per-thread runner approach correctly prevents contention. The lock protects dictionary mutations while allowing concurrent coroutine execution across threads.

Note: __enter__() is called without a corresponding __exit__() — cleanup relies on explicit close() in tear_down. This works but is an atypical pattern. Consider documenting this in a comment to clarify the intentional lifecycle management.

📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1327e07 and 4e8492e.

📒 Files selected for processing (1)
  • src/snakemake/workflow.py
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py

⚙️ CodeRabbit configuration file

**/*.py: Do not try to improve formatting.
Do not suggest type annotations for functions that are defined inside of functions or methods.
Do not suggest type annotation of the self argument of methods.
Do not suggest type annotation of the cls argument of classmethods.
Do not suggest return type annotation if a function or method does not contain a return statement.

Files:

  • src/snakemake/workflow.py
🧠 Learnings (4)
📚 Learning: 2024-11-05T09:41:56.469Z
Learnt from: johanneskoester
Repo: snakemake/snakemake PR: 3184
File: snakemake/dag.py:1340-1342
Timestamp: 2024-11-05T09:41:56.469Z
Learning: The method `self.workflow.persistence.software_stack_changed(job)` in `snakemake/persistence.py` is not asynchronous.

Applied to files:

  • src/snakemake/workflow.py
📚 Learning: 2024-10-04T16:12:18.927Z
Learnt from: lczech
Repo: snakemake/snakemake PR: 3113
File: snakemake/scheduler.py:912-914
Timestamp: 2024-10-04T16:12:18.927Z
Learning: In `snakemake/scheduler.py`, avoid suggesting the use of `asyncio.gather` in the `jobs_rewards` method due to overhead concerns and the need for immediate results.

Applied to files:

  • src/snakemake/workflow.py
📚 Learning: 2024-10-11T13:12:35.827Z
Learnt from: johanneskoester
Repo: snakemake/snakemake PR: 3132
File: snakemake/deployment/conda.py:85-88
Timestamp: 2024-10-11T13:12:35.827Z
Learning: In the `snakemake/deployment/conda.py` file, within the `Env` class, validation of `env_dir` occurs later in the code, so it's unnecessary to validate `env_dir` in the `__init__` method.

Applied to files:

  • src/snakemake/workflow.py
📚 Learning: 2026-01-08T11:13:20.523Z
Learnt from: johanneskoester
Repo: snakemake/snakemake PR: 3643
File: src/snakemake/sourcecache.py:474-492
Timestamp: 2026-01-08T11:13:20.523Z
Learning: In Python files under src/snakemake, ensure that the per-run cache_path for source files does not change within a single workflow run, so that HostingProviderFile._hosted_repos can be keyed reliably by repository name. This invariant should be documented and enforced (e.g., by freezing cache_path after initialization, avoiding mutations during a run, and ensuring any caching uses a run-scoped key). This pattern applies to all modules in the Snakemake source cache area where per-run caching and repository hosting are implemented.

Applied to files:

  • src/snakemake/workflow.py
🧬 Code graph analysis (1)
src/snakemake/workflow.py (1)
src/snakemake/common/__init__.py (2)
  • async_runner (96-148)
  • async_run (151-175)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (46)
  • GitHub Check: tests (10, ubuntu-latest, py312)
  • GitHub Check: tests (10, ubuntu-latest, py313)
  • GitHub Check: tests (9, windows-2022, py313)
  • GitHub Check: tests (6, ubuntu-latest, py312)
  • GitHub Check: tests (6, windows-2022, py313)
  • GitHub Check: tests (8, windows-2022, py313)
  • GitHub Check: tests (8, ubuntu-latest, py313)
  • GitHub Check: tests (10, windows-2022, py313)
  • GitHub Check: tests (10, ubuntu-latest, py311)
  • GitHub Check: tests (9, ubuntu-latest, py312)
  • GitHub Check: tests (8, ubuntu-latest, py312)
  • GitHub Check: tests (9, ubuntu-latest, py313)
  • GitHub Check: tests (7, ubuntu-latest, py311)
  • GitHub Check: tests (7, windows-2022, py313)
  • GitHub Check: tests (4, windows-2022, py313)
  • GitHub Check: tests (8, macos-latest, py313)
  • GitHub Check: tests (3, ubuntu-latest, py313)
  • GitHub Check: tests (4, ubuntu-latest, py311)
  • GitHub Check: tests (6, ubuntu-latest, py313)
  • GitHub Check: tests (5, ubuntu-latest, py313)
  • GitHub Check: tests (7, macos-latest, py313)
  • GitHub Check: tests (5, ubuntu-latest, py311)
  • GitHub Check: tests (8, ubuntu-latest, py311)
  • GitHub Check: tests (6, ubuntu-latest, py311)
  • GitHub Check: tests (2, macos-latest, py313)
  • GitHub Check: tests (7, ubuntu-latest, py312)
  • GitHub Check: tests (4, ubuntu-latest, py312)
  • GitHub Check: tests (3, macos-latest, py313)
  • GitHub Check: tests (4, ubuntu-latest, py313)
  • GitHub Check: tests (5, ubuntu-latest, py312)
  • GitHub Check: tests (9, ubuntu-latest, py311)
  • GitHub Check: tests (2, windows-2022, py313)
  • GitHub Check: tests (3, windows-2022, py313)
  • GitHub Check: tests (6, macos-latest, py313)
  • GitHub Check: tests (7, ubuntu-latest, py313)
  • GitHub Check: tests (3, ubuntu-latest, py312)
  • GitHub Check: tests (2, ubuntu-latest, py313)
  • GitHub Check: tests (1, windows-2022, py313)
  • GitHub Check: tests (2, ubuntu-latest, py312)
  • GitHub Check: tests (5, windows-2022, py313)
  • GitHub Check: tests (1, ubuntu-latest, py312)
  • GitHub Check: tests (3, ubuntu-latest, py311)
  • GitHub Check: tests (1, ubuntu-latest, py311)
  • GitHub Check: tests (1, ubuntu-latest, py313)
  • GitHub Check: tests (2, ubuntu-latest, py311)
  • GitHub Check: apidocs
🔇 Additional comments (7)
src/snakemake/workflow.py (7)

6-6: LGTM!

The new imports support the per-thread async runner infrastructure correctly: ThreadPoolExecutor provides a shared executor, threading enables thread identification and locking, and async_runner is the factory from common/__init__.py that creates properly configured asyncio.Runner instances.

Also applies to: 16-16, 120-120


225-228: LGTM!

The async state initialization is appropriate: a dictionary for per-thread runners, a shared ThreadPoolExecutor for async I/O operations, and a lock for thread-safe access. The executor uses Python's default worker count, which is suitable for I/O-bound workloads.


340-340: LGTM!

Storage operations correctly routed through the per-workflow async runner.

Also applies to: 360-360


993-993: LGTM!

DAG operations (clean, get_outputs_with_changes) and the summary async generator wrapper correctly use the per-workflow async runner.

Also applies to: 1005-1005, 1034-1034


1126-1130: LGTM!

Report creation and DAG building operations correctly migrated to use the per-workflow async runner.

Also applies to: 1243-1244


1279-1284: LGTM!

Execution-phase async operations (file waiting, DAG post-processing, storage management) correctly migrated to the per-workflow async runner pattern.

Also applies to: 1303-1305, 1478-1480


2195-2195: LGTM!

Import statement formatting is consistent with the codebase style.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
src/snakemake/workflow.py (1)

287-294: Use the public asyncio shutdown API instead of manipulating private attributes.

The code accesses runner._loop._default_executor, which are private attributes. While _default_executor has remained stable in Python 3.8–3.12, relying on private internals is a maintenance risk.

Python 3.9+ provides a public API for this: await loop.shutdown_default_executor(). Since Snakemake requires Python ≥ 3.11, this API is available in all supported versions. Consider using it instead to avoid private attribute access:

 with self._async_lock:
     for runner in self._async_runners.values():
-        # Remove executor from runner manually to prevent `runner.close`
-        # shutting it down before the other runners finish using it
-        if runner._loop is not None:
-            runner._loop._default_executor = None
+        # Shutdown the loop's default executor before closing to prevent
+        # the shared executor from being shut down by the first runner
+        try:
+            loop = runner._loop
+            if loop is not None:
+                await loop.shutdown_default_executor()
+        except AttributeError:
+            pass  # Fallback for event loop implementations without this method
         runner.close()
 self._async_executor.shutdown()

Note: If tear_down() is not async, you may need to adjust the calling context or run the shutdown in a separate async context.

📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4e8492e and bf20b1b.

📒 Files selected for processing (1)
  • src/snakemake/workflow.py
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py

⚙️ CodeRabbit configuration file

**/*.py: Do not try to improve formatting.
Do not suggest type annotations for functions that are defined inside of functions or methods.
Do not suggest type annotation of the self argument of methods.
Do not suggest type annotation of the cls argument of classmethods.
Do not suggest return type annotation if a function or method does not contain a return statement.

Files:

  • src/snakemake/workflow.py
🧠 Learnings (4)
📚 Learning: 2024-11-05T09:41:56.469Z
Learnt from: johanneskoester
Repo: snakemake/snakemake PR: 3184
File: snakemake/dag.py:1340-1342
Timestamp: 2024-11-05T09:41:56.469Z
Learning: The method `self.workflow.persistence.software_stack_changed(job)` in `snakemake/persistence.py` is not asynchronous.

Applied to files:

  • src/snakemake/workflow.py
📚 Learning: 2024-10-04T16:12:18.927Z
Learnt from: lczech
Repo: snakemake/snakemake PR: 3113
File: snakemake/scheduler.py:912-914
Timestamp: 2024-10-04T16:12:18.927Z
Learning: In `snakemake/scheduler.py`, avoid suggesting the use of `asyncio.gather` in the `jobs_rewards` method due to overhead concerns and the need for immediate results.

Applied to files:

  • src/snakemake/workflow.py
📚 Learning: 2024-10-11T13:12:35.827Z
Learnt from: johanneskoester
Repo: snakemake/snakemake PR: 3132
File: snakemake/deployment/conda.py:85-88
Timestamp: 2024-10-11T13:12:35.827Z
Learning: In the `snakemake/deployment/conda.py` file, within the `Env` class, validation of `env_dir` occurs later in the code, so it's unnecessary to validate `env_dir` in the `__init__` method.

Applied to files:

  • src/snakemake/workflow.py
📚 Learning: 2026-01-08T11:13:20.523Z
Learnt from: johanneskoester
Repo: snakemake/snakemake PR: 3643
File: src/snakemake/sourcecache.py:474-492
Timestamp: 2026-01-08T11:13:20.523Z
Learning: In Python files under src/snakemake, ensure that the per-run cache_path for source files does not change within a single workflow run, so that HostingProviderFile._hosted_repos can be keyed reliably by repository name. This invariant should be documented and enforced (e.g., by freezing cache_path after initialization, avoiding mutations during a run, and ensuring any caching uses a run-scoped key). This pattern applies to all modules in the Snakemake source cache area where per-run caching and repository hosting are implemented.

Applied to files:

  • src/snakemake/workflow.py
🧬 Code graph analysis (1)
src/snakemake/workflow.py (1)
src/snakemake/common/__init__.py (2)
  • async_runner (96-148)
  • async_run (151-175)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (44)
  • GitHub Check: tests (1, ubuntu-latest, py311)
  • GitHub Check: tests (9, windows-2022, py313)
  • GitHub Check: tests (7, ubuntu-latest, py312)
  • GitHub Check: tests (8, ubuntu-latest, py311)
  • GitHub Check: tests (10, windows-2022, py313)
  • GitHub Check: tests (9, ubuntu-latest, py312)
  • GitHub Check: tests (9, ubuntu-latest, py311)
  • GitHub Check: tests (8, windows-2022, py313)
  • GitHub Check: tests (8, ubuntu-latest, py312)
  • GitHub Check: tests (10, macos-latest, py313)
  • GitHub Check: tests (4, ubuntu-latest, py313)
  • GitHub Check: tests (4, ubuntu-latest, py312)
  • GitHub Check: tests (5, ubuntu-latest, py312)
  • GitHub Check: tests (3, windows-2022, py313)
  • GitHub Check: tests (4, windows-2022, py313)
  • GitHub Check: tests (7, ubuntu-latest, py313)
  • GitHub Check: tests (10, ubuntu-latest, py313)
  • GitHub Check: tests (6, ubuntu-latest, py312)
  • GitHub Check: tests (10, ubuntu-latest, py312)
  • GitHub Check: tests (10, ubuntu-latest, py311)
  • GitHub Check: tests (7, windows-2022, py313)
  • GitHub Check: tests (7, ubuntu-latest, py311)
  • GitHub Check: tests (9, ubuntu-latest, py313)
  • GitHub Check: tests (7, macos-latest, py313)
  • GitHub Check: tests (6, ubuntu-latest, py313)
  • GitHub Check: tests (3, ubuntu-latest, py313)
  • GitHub Check: tests (6, ubuntu-latest, py311)
  • GitHub Check: tests (8, ubuntu-latest, py313)
  • GitHub Check: tests (5, macos-latest, py313)
  • GitHub Check: tests (4, ubuntu-latest, py311)
  • GitHub Check: tests (6, windows-2022, py313)
  • GitHub Check: tests (5, windows-2022, py313)
  • GitHub Check: tests (5, ubuntu-latest, py313)
  • GitHub Check: tests (1, ubuntu-latest, py313)
  • GitHub Check: tests (5, ubuntu-latest, py311)
  • GitHub Check: tests (2, ubuntu-latest, py312)
  • GitHub Check: tests (2, macos-latest, py313)
  • GitHub Check: tests (3, ubuntu-latest, py311)
  • GitHub Check: tests (1, windows-2022, py313)
  • GitHub Check: tests (2, ubuntu-latest, py313)
  • GitHub Check: tests (3, ubuntu-latest, py312)
  • GitHub Check: tests (2, ubuntu-latest, py311)
  • GitHub Check: tests (2, windows-2022, py313)
  • GitHub Check: tests (1, ubuntu-latest, py312)
🔇 Additional comments (12)
src/snakemake/workflow.py (12)

225-227: LGTM! Per-thread async runner state initialization looks correct.

The design of maintaining per-thread runners with a shared executor addresses the port exhaustion issue effectively. One consideration: ThreadPoolExecutor() is created without max_workers, which defaults to min(32, os.cpu_count() + 4) in Python 3.8+. This should be fine for most workflows, but worth noting if executor thread count becomes a concern.


257-264: Thread-safe per-thread runner access implemented correctly.

The design properly isolates runners per thread while sharing the executor. The lock scope is appropriately limited to dict access, avoiding serialization of async operations.

Note: asyncio.Runner.run() cannot be called re-entrantly. If a coroutine executed via self.async_run() somehow triggers another self.async_run() call on the same thread, it would fail. This matches the behavior of the original async_run wrapper in common/__init__.py, so this is not a regression.


1244-1245: LGTM! DAG initialization async calls properly routed through workflow's runner.

The _build_dag method correctly uses self.async_run for both dag.init() and dag.update_checkpoint_dependencies(), ensuring consistent runner reuse during DAG construction.


1304-1306: LGTM! Execution flow async calls updated consistently.

The async postprocessing call is correctly routed through the workflow's runner.


1479-1481: LGTM! Storage operations use the workflow's async runner.

Both store_storage_outputs and cleanup_storage_objects now properly reuse the per-thread runner, which should help with the port exhaustion issue mentioned in the PR objectives.


341-341: LGTM! Source archive cleanup uses the workflow's async runner.


361-361: LGTM! Source upload uses the workflow's async runner.


994-994: LGTM! DAG clean operation uses the workflow's async runner.


1006-1006: LGTM! Change listing uses the workflow's async runner.


1035-1035: LGTM! Summary generation uses the workflow's async runner.


1127-1131: LGTM! Report creation uses the workflow's async runner.


1280-1285: LGTM! File wait operation uses the workflow's async runner.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
src/snakemake/common/__init__.py (1)

96-148: Well-structured async runner factory with proper validation.

The implementation correctly:

  • Detects running event loops via the fast-path check
  • Enforces mutual exclusivity between executor and loop_factory parameters
  • Properly configures the event loop with the executor when provided

One minor observation: the inner loop_factory function (lines 142-146) shadows the parameter name. While this works correctly due to closure semantics, consider renaming for clarity (e.g., _loop_factory).

Optional: rename inner function to avoid shadowing
     if executor is not None:
         if loop_factory is not None:
             raise ValueError(
                 "only executor or loop_factory can be given as an argument"
             )

-        def loop_factory():
+        def _loop_factory():
             loop = asyncio.new_event_loop()
             loop.set_default_executor(executor)
             asyncio.set_event_loop(loop)
             return loop

-    return asyncio.Runner(loop_factory=loop_factory)
+        loop_factory = _loop_factory
+
+    return asyncio.Runner(loop_factory=loop_factory)
📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between bf20b1b and 44b2408.

📒 Files selected for processing (4)
  • src/snakemake/common/__init__.py
  • src/snakemake/io/__init__.py
  • src/snakemake/rules.py
  • src/snakemake/workflow.py
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/snakemake/io/init.py
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py

⚙️ CodeRabbit configuration file

**/*.py: Do not try to improve formatting.
Do not suggest type annotations for functions that are defined inside of functions or methods.
Do not suggest type annotation of the self argument of methods.
Do not suggest type annotation of the cls argument of classmethods.
Do not suggest return type annotation if a function or method does not contain a return statement.

Files:

  • src/snakemake/workflow.py
  • src/snakemake/rules.py
  • src/snakemake/common/__init__.py
🧠 Learnings (4)
📚 Learning: 2024-11-05T09:41:56.469Z
Learnt from: johanneskoester
Repo: snakemake/snakemake PR: 3184
File: snakemake/dag.py:1340-1342
Timestamp: 2024-11-05T09:41:56.469Z
Learning: The method `self.workflow.persistence.software_stack_changed(job)` in `snakemake/persistence.py` is not asynchronous.

Applied to files:

  • src/snakemake/workflow.py
📚 Learning: 2024-10-04T16:12:18.927Z
Learnt from: lczech
Repo: snakemake/snakemake PR: 3113
File: snakemake/scheduler.py:912-914
Timestamp: 2024-10-04T16:12:18.927Z
Learning: In `snakemake/scheduler.py`, avoid suggesting the use of `asyncio.gather` in the `jobs_rewards` method due to overhead concerns and the need for immediate results.

Applied to files:

  • src/snakemake/workflow.py
📚 Learning: 2024-10-11T13:12:35.827Z
Learnt from: johanneskoester
Repo: snakemake/snakemake PR: 3132
File: snakemake/deployment/conda.py:85-88
Timestamp: 2024-10-11T13:12:35.827Z
Learning: In the `snakemake/deployment/conda.py` file, within the `Env` class, validation of `env_dir` occurs later in the code, so it's unnecessary to validate `env_dir` in the `__init__` method.

Applied to files:

  • src/snakemake/workflow.py
📚 Learning: 2026-01-08T11:13:20.523Z
Learnt from: johanneskoester
Repo: snakemake/snakemake PR: 3643
File: src/snakemake/sourcecache.py:474-492
Timestamp: 2026-01-08T11:13:20.523Z
Learning: In Python files under src/snakemake, ensure that the per-run cache_path for source files does not change within a single workflow run, so that HostingProviderFile._hosted_repos can be keyed reliably by repository name. This invariant should be documented and enforced (e.g., by freezing cache_path after initialization, avoiding mutations during a run, and ensuring any caching uses a run-scoped key). This pattern applies to all modules in the Snakemake source cache area where per-run caching and repository hosting are implemented.

Applied to files:

  • src/snakemake/workflow.py
  • src/snakemake/rules.py
  • src/snakemake/common/__init__.py
🧬 Code graph analysis (3)
src/snakemake/workflow.py (1)
src/snakemake/common/__init__.py (2)
  • async_runner (96-148)
  • async_run (151-175)
src/snakemake/rules.py (2)
src/snakemake/workflow.py (2)
  • async_run (258-265)
  • func (1716-1722)
src/snakemake/common/__init__.py (2)
  • async_run (151-175)
  • get_function_params (358-362)
src/snakemake/common/__init__.py (2)
src/snakemake/workflow.py (2)
  • async_run (258-265)
  • run (2390-2391)
src/snakemake/scheduling/job_scheduler.py (1)
  • run (504-508)
🪛 Ruff (0.14.10)
src/snakemake/common/__init__.py

129-134: Avoid specifying long messages outside the exception class

(TRY003)


138-140: Avoid specifying long messages outside the exception class

(TRY003)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (46)
  • GitHub Check: tests (9, windows-2022, py313)
  • GitHub Check: tests (10, ubuntu-latest, py312)
  • GitHub Check: tests (6, macos-latest, py313)
  • GitHub Check: tests (10, ubuntu-latest, py311)
  • GitHub Check: tests (9, macos-latest, py313)
  • GitHub Check: tests (7, ubuntu-latest, py313)
  • GitHub Check: tests (5, macos-latest, py313)
  • GitHub Check: tests (10, windows-2022, py313)
  • GitHub Check: tests (9, ubuntu-latest, py313)
  • GitHub Check: tests (6, windows-2022, py313)
  • GitHub Check: tests (8, ubuntu-latest, py311)
  • GitHub Check: tests (7, ubuntu-latest, py312)
  • GitHub Check: tests (8, windows-2022, py313)
  • GitHub Check: tests (8, ubuntu-latest, py313)
  • GitHub Check: tests (7, ubuntu-latest, py311)
  • GitHub Check: tests (6, ubuntu-latest, py311)
  • GitHub Check: tests (10, ubuntu-latest, py313)
  • GitHub Check: tests (9, ubuntu-latest, py312)
  • GitHub Check: tests (6, ubuntu-latest, py313)
  • GitHub Check: tests (6, ubuntu-latest, py312)
  • GitHub Check: tests (9, ubuntu-latest, py311)
  • GitHub Check: tests (8, ubuntu-latest, py312)
  • GitHub Check: tests (7, windows-2022, py313)
  • GitHub Check: tests (4, windows-2022, py313)
  • GitHub Check: tests (5, ubuntu-latest, py313)
  • GitHub Check: tests (5, ubuntu-latest, py311)
  • GitHub Check: tests (4, ubuntu-latest, py311)
  • GitHub Check: tests (1, ubuntu-latest, py312)
  • GitHub Check: tests (4, ubuntu-latest, py312)
  • GitHub Check: tests (3, ubuntu-latest, py313)
  • GitHub Check: tests (5, windows-2022, py313)
  • GitHub Check: tests (2, windows-2022, py313)
  • GitHub Check: tests (3, windows-2022, py313)
  • GitHub Check: tests (1, ubuntu-latest, py311)
  • GitHub Check: tests (3, ubuntu-latest, py312)
  • GitHub Check: tests (2, ubuntu-latest, py311)
  • GitHub Check: tests (2, macos-latest, py313)
  • GitHub Check: tests (5, ubuntu-latest, py312)
  • GitHub Check: tests (4, ubuntu-latest, py313)
  • GitHub Check: tests (1, windows-2022, py313)
  • GitHub Check: tests (1, macos-latest, py313)
  • GitHub Check: tests (2, ubuntu-latest, py312)
  • GitHub Check: tests (3, ubuntu-latest, py311)
  • GitHub Check: tests (2, ubuntu-latest, py313)
  • GitHub Check: tests (1, ubuntu-latest, py313)
  • GitHub Check: apidocs
🔇 Additional comments (8)
src/snakemake/common/__init__.py (1)

151-175: LGTM!

Clean convenience wrapper that properly uses the context manager for resource cleanup.

src/snakemake/rules.py (2)

676-711: Correct implementation of async_run parameter propagation.

The logic properly:

  1. Accepts async_run as an optional parameter
  2. Only injects it into _aux_params if both conditions are met:
    • async_run is not None
    • The target function actually accepts an async_run parameter

This deferred injection (after evaluating other aux params) correctly prevents async_run from being called during the callable evaluation phase.


1100-1112: LGTM!

Correctly passes self.workflow.async_run (the method reference) to resource evaluation, enabling callable resources to execute async operations through the workflow's managed runner.

src/snakemake/workflow.py (5)

226-228: Proper initialization of async runner infrastructure.

The three instance variables establish the foundation for per-thread async runner management:

  • _async_runners: dict mapping thread IDs to runners
  • _async_executor: shared ThreadPoolExecutor
  • _async_lock: threading.Lock for thread-safe access

258-265: Thread-safe lazy initialization of per-thread runners.

The pattern correctly:

  • Uses lock for thread-safe runner creation/lookup
  • Releases lock before runner.run(coro) to allow concurrent execution across threads
  • Shares a single ThreadPoolExecutor across all runners

Note: Calling __enter__() directly without a corresponding __exit__() here is intentional—cleanup is handled in tear_down(). This is a valid pattern for resources with workflow-scoped lifetimes.


1243-1246: LGTM!

All call sites consistently use self.async_run() to route async operations through the workflow's managed per-thread runner infrastructure.


118-128: LGTM!

Import correctly changed from async_run to async_runner, as the Workflow class now provides its own async_run method that uses the factory.


288-295: Teardown logic correctly prevents premature executor shutdown.

The code:

  1. Acquires lock during cleanup
  2. Guards against runner._loop being None
  3. Detaches the executor from each loop before closing runners
  4. Shuts down the shared executor only after all runners are closed

Note: The code accesses runner._loop and runner._loop._default_executor, which are private asyncio internals. These have been stable since Python 3.7, and the existing null check on runner._loop provides appropriate defensive coverage. As an alternative, you could use the public API runner.get_loop()._default_executor = None for equivalent behavior.

@coroa
Copy link
Copy Markdown
Contributor Author

coroa commented Jan 12, 2026

@johanneskoester This one would be ready again.

  • I included your suggestion to have an workflow.async_run method,
  • solved the thread-safety issue and
  • added a shared thread pool executor so that there only is a limited number of threads.

@johanneskoester johanneskoester merged commit 385e0ca into snakemake:main Jan 15, 2026
60 checks passed
@johanneskoester
Copy link
Copy Markdown
Contributor

Thanks a lot @melund and @coroa! Fantastic work!

johanneskoester pushed a commit that referenced this pull request Jan 15, 2026
🤖 I have created a release *beep* *boop*
---


##
[9.14.7](v9.14.6...v9.14.7)
(2026-01-15)


### Bug Fixes

* report psutil errors in case benchmark fails
([#3925](#3925))
([293ec40](293ec40))
* reuse async runner to prevent port exhaustion
([#3911](#3911))
([385e0ca](385e0ca))

---
This PR was generated with [Release
Please](https://github.com/googleapis/release-please). See
[documentation](https://github.com/googleapis/release-please#release-please).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants