Conversation
📝 WalkthroughWalkthroughAdds Windows-specific event-loop handling to Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~22 minutes 🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
Please format your code with |
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
Repeated calls to asyncio.run() on Windows create new ProactorEventLoops, which I think establishes new TCP socket every time. I think this uses up all the ports and and hangs long execution runs. This patch replaces asyncio.run() with a persistent thread-local event loop on Windows.
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (1)
src/snakemake/common/__init__.py (1)
120-122: Consider using explicit exception chaining for better tracebacks.The static analysis correctly notes that
raise ... from eprovides clearer tracebacks. However, the existing non-Windows path (lines 126-134) uses the same pattern of passingeas a constructor argument toWorkflowError, so this appears intentional for consistency with howWorkflowErroris designed.If you do want to improve this, both paths could be updated:
♻️ Optional: Use explicit exception chaining
try: return _thread_local.loop.run_until_complete(coroutine) except RuntimeError as e: coroutine.close() - raise WorkflowError("Error running coroutine in event loop.", e) + raise WorkflowError("Error running coroutine in event loop.") from e
📜 Review details
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/snakemake/common/__init__.py
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py
⚙️ CodeRabbit configuration file
**/*.py: Do not try to improve formatting.
Do not suggest type annotations for functions that are defined inside of functions or methods.
Do not suggest type annotation of theselfargument of methods.
Do not suggest type annotation of theclsargument of classmethods.
Do not suggest return type annotation if a function or method does not contain areturnstatement.
Files:
src/snakemake/common/__init__.py
🪛 Ruff (0.14.10)
src/snakemake/common/__init__.py
122-122: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
122-122: Avoid specifying long messages outside the exception class
(TRY003)
🔇 Additional comments (2)
src/snakemake/common/__init__.py (2)
96-100: LGTM — Conditional import and thread-local setup is appropriate.Using
threading.local()at module level ensures each thread gets its own event loop, which correctly isolates concurrent execution contexts while avoiding the repeated loop creation/teardown that causes port exhaustion.
109-119: Sound approach for mitigating Windows port exhaustion.The implementation correctly:
- Lazily creates one event loop per thread and reuses it
- Checks both for missing loop (
not hasattr) and closed loop (is_closed())- Calls
set_event_loopto register the newly created loop with asyncio's thread-local stateThis avoids the repeated
ProactorEventLoopcreation/teardown that was exhausting ephemeral ports.
johanneskoester
left a comment
There was a problem hiding this comment.
Looks good to me. Wouldn't the same make sense on Unix-like OSes as well?
|
Also, I have recently spotted that btop reports large numbers of threads for the main snakemake process on linux. Although they all seem to sleep, it seems to me that this might be related. Maybe the async spawns are not getting closed by Python? |
|
Just wanted to chime in quickly that i debugged the same issue last week and came to a different fix by persisting an async_runner on the workflow object so that the job scheduler can just reuse the same async event loop: #3911. The solution here is also quite elegant i find. IMHO, ideally one would instead rewrite quite some of the snakemake inner logic to be async, and then just keep within the same async loop. I started doing that, too, but did not find a good approach for the |
I am not sure. The stalling/hanging is specific to Windows where python implements sockets with real TCP connections, which have a "cooling off" period before they are freed. I think linux uses some sort pipe to communicate which are freed immediately. Also, using So I think it safest to leave this fix a Windows only. |
|
@coroa Was your problem also on Windows? |
Yes. They are related to the same underlying pattern: calling When we run concurrently (e.g., -j 20), the scheduler invokes I believe every new Event Loop creates its own default ThreadPoolExecutor (usually cpu_count workers) if it needs to run blocking code. On Linux: This results in a "pool of pools." 20 jobs × ~20 default threads = ~400 idle threads. They sleep (as you saw) so they don't consume CPU, but they do bloat the thread count. On Windows: We hit a harder limit. Each new loop also creates a TCP socket pair. 20 jobs × rapid scheduling cycles = thousands of TCP connections which eventually exhausts the port range and crashes the process. This PR fixes the crash on Windows by reusing the loop. On Linux, the high thread count is "safe" (if messy), but applying this same fix there would likely reduce that thread bloat as wel. I keept the |
Yes, the actual issue only surfaced on windows in the proactor. Same stack trace as your py-spy one, but i have observed hangs on linux too (which might be unrelated). |
|
@johanneskoester, I could make a fix like this for all platforms: _thread_local = threading.local()
def async_run(coroutine):
"""Attaches to running event loop or creates a new one to execute a
coroutine.
.. seealso::
https://github.com/snakemake/snakemake/issues/1105
https://stackoverflow.com/a/65696398
"""
# We reuse a thread-local loop to avoid:
# 1. On Windows: Port exhaustion (ProactorEventLoop creates TCP sockets)
# 2. On Linux: Thread bloat (New loops spawn new ThreadPoolExecutors)
if not hasattr(_thread_local, "loop") or _thread_local.loop.is_closed():
_thread_local.loop = asyncio.new_event_loop()
asyncio.set_event_loop(_thread_local.loop)
# Limit inner threads to 1, effectively flattening the "Pool of Pools"
_thread_local.loop.set_default_executor(
concurrent.futures.ThreadPoolExecutor(max_workers=1)
)
try:
return _thread_local.loop.run_until_complete(coroutine)
except RuntimeError as e:
coroutine.close()
raise WorkflowError("Error running coroutine in event loop.", e)Crucially, to fix the bloat, I can also limit the executor size on these internal loops. Since the code is already running inside a parallel Snakemake worker, it shouldn't need a massive thread pool of its own. @johanneskoester /@coroa , do you think that there are any sideeffects of doing this? |
|
@johanneskoester, Just merge this windows PR right away. I also made a PR (#3912) with the extended version for all OS'es. But I have no way to really test this on Linux, so I would think that the extended PR should only be merged once someone has used it for some real world applications. |
|
I actually was also able to produce infrequent lock-ups on linux when using So i would not make it a windows only fix, but would suggest either #3912 or #3911 . Which one? Puh. I originally thought, the unsolved issues in #3911 (not thread safe and resource expressions need to spawn a new async runner), made the global variable approach here a clear winner, but then i found solutions for both. The main differences i see are: #3911 uses the high-level asyncio.Runner interface (which was introduced in 3.11, which might be an issue) and cleans up the event loops when the runner is closed on tear down of the Workflow object, but it needs to jump through several hoops to make it available everywhere consistently. #3912 uses the low-level event loop management interfaces to ensure that each thread reuses the same event loop, so the final effect is the same, except that the event loops are not closed at the end, which might hide exceptions in asyncio tasks (which snakemake is typically not using) and reuse the same event loops between several workflow instances (which snakemake typically does not have). But, it is a very local code change and will practically achieve the same outcome. So, 🤷 . I would not use |
|
@coroa You mentioned: "there is a high chance instantiating one ... does actually increase the number of threads rather than decrease it" Actually, the motivation for The default By forcing I think someone needs to actual try to run snakemake with the change in #3912, and see what it does with the thread bloat on Linux. Until then I wote for merging this PR to fix the critical issue on Windows with a minimal code. |
I understand that, and your fix w/o the What i am unclear/worried about is that if you don't set the default executor, the event loop will instantiate one, WHEN it is needed, and i think it is not needed in most cases. If you instantiate an executor to set it as the default then the additional single thread is there regardless if it is used. |
|
Yes. You are right. In this case it would always instantiate a single worker-thread for every concurrent job. But I suspect that there could be something that is indeed lazily instantiating these executors (likely aiofiles, implicit run_in_executor calls, or plugins). The default My fist PR, and yours does nothing to fix that issue (I think 😅 ). It just prevents that they are not constantly closed and created again. Of course if you could remove the places that instantiate the |
|
@johanneskoester I fine with @coroa's solution to the issues on Windows. I haven't run my large pipelines with his PR yet, but I assume they would solve the problem as well. So if you like that solution better, just merge that. I am looking forward to new version of |
Ok, i did a very loose review job of how often run_in_executor appears in base libraries and you are right it's more often than i thought. it's used by aiofiles, as by aiohttp. So this might launch it more often. What do you think about sharing a default ThreadPoolExecutor across all asyncio loops? |
I don't see any problems with that, but that doesn't count for much since I am not well versed in asyncio, and how the different parts play together. I think that part could also come in a follow up PR - I am really hoping for a new snakemake release soon 😅 |
|
Closing in favor of #3911. |
Interestingly i debugged the same issue as reported in #3909 last week, and came to a slightly different implementation for reusing the asynchronuous loop. The benefit is that it is more explicit, than the implicit reuse. The drawback is that you still have multiple async runners, since the `async_run` in [`InputFiles._predicated_size_files`](https://github.com/coroa/snakemake/blob/401c86946e72f4e2112ae393ae745dcccc2ebe91/src/snakemake/io/__init__.py#L1927-L1937) cannot access the workflow object, when it is being used through [`eval_resource_expression`](https://github.com/coroa/snakemake/blob/401c86946e72f4e2112ae393ae745dcccc2ebe91/src/snakemake/resources.py#L526-L571). ### QC <!-- Make sure that you can tick the boxes below. --> * [x] The PR contains a test case for the changes or the changes are already covered by an existing test case. * [x] The documentation (`docs/`) is updated to reflect the changes or this is not necessary (e.g. if the change does neither modify the language nor the behavior or functionalities of Snakemake). <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **Refactor** * Centralized asynchronous dispatch into per-workflow runners with orderly creation and teardown, replacing scattered external async calls. * **New Features** * Workflow-managed async execution now drives scheduling, execution, caching, postprocessing and touch operations. * Resource/input/benchmark evaluation callables and input helpers can accept and propagate an async execution handle. * **Bug Fixes** * Fixed resource iteration extraction and simplified an IO-file error message for clearer output. <sub>✏️ Tip: You can customize this high-level summary in your review settings.</sub> <!-- end of auto-generated comment: release notes by coderabbit.ai -->
This PR tries to fixes a issue on Windows where Snakemake hangs or freezes after running for an extended period (typically after > 1-6 hour or with many jobs).
The Issue:
Snakemake's scheduler frequently calls async_run (wrapping
asyncio.run) to check job status.Here is
py-spydump from a one of my runs that stalled after many hours.I guess that creating and destroying thousands of these loops exhausts the availableTCP ports, or something similar. Once ports are exhausted,
socket.socketpair()hangs indefinitely inside accept(), freezing the Snakemake process.The Fix:
I modified
async_runin__init__.pyto reuse the asyncio event loop (one per every local thread) when running on Windows. Similar to how a long-running application would manage its loop. I am hoping this prevents the constant teardown and recreation of socket pairs.--
I am also running some tests where I just try to change the type of event loop directly in the snakefile.
Maybe this could also be done directly in snakemake, and maybe that would be be better than reusing the eventloop that this PR proposes. I am not really an expert on this. So any suggestions would be greatly appreciated.
QC
docs/) is updated to reflect the changes or this is not necessary (e.g. if the change does neither modify the language nor the behavior or functionalities of Snakemake).Summary by CodeRabbit
✏️ Tip: You can customize this high-level summary in your review settings.