WIP / RFC Remove custom threadpoolexecutor#6607
Conversation
| @contextmanager | ||
| def secede(): | ||
| worker = get_worker() | ||
| duration = time() - thread_state.start_time | ||
| worker.loop.add_callback( | ||
| worker.handle_stimulus, | ||
| SecedeEvent( | ||
| key=thread_state.key, | ||
| compute_duration=duration, | ||
| stimulus_id=f"worker-client-secede-{time()}", | ||
| ), | ||
| ) | ||
|
|
||
| yield |
There was a problem hiding this comment.
This is a duplication of worker.secede (I didn't realize this being there).
I think once #5882 is implemented we should only offer this as a ctx manager
| self.executors["default"] = ThreadPoolExecutor( | ||
| nthreads, thread_name_prefix="Dask-Default-Threads" | ||
| thread_name_prefix="Dask-Default-Threads" | ||
| ) | ||
|
|
There was a problem hiding this comment.
The one problem/shortcoming we'd face when dropping our custom threadpool is that this would inherently limit the number of tasks that are allowed to be seceded to the natural threshold the concurrent.futures.ThreadPoolExecutor sets which is currently min(32, (os.cpu_count() or 1) + 4)
I don't think having a limitation to the number of threads (including seceded) is necessarily a bad thing, though, and we could always make this configurable for power users
|
This would conflict with #4726 |
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 15 files ± 0 15 suites ±0 7h 13m 9s ⏱️ - 3m 41s For more details on these failures and errors, see this check. Results for commit 91b41d5. ± Comparison against base commit e6cc40a. |
|
Another shortcoming of just using the stdlib threadpool is that the stdlib threadpools threads are not deamonic and would therefore block interpreter shutdown. I'm not fully convinced that this is a blocker since in most/all proper deployment situations we'd still have a nanny who could kill the process externally if the worker itself refuses to close |
I believe there is no need to actually have a custom threadpool and the ability to "secede" from the threadpool, i.e. to adjust the allowed thread count while running.
I believe this functionality causes confusion for users and it doesn't work as intended.
Right now we do have two mechanisms in place to control the number of running Tasks. There is the Threadpool limit which is, by default, set to
Worker.state.nthreadand there is our state machine that controls how many tasks are allowed to be in the stateexecuting, i.e. how many tasks were already submitted to the executor without it being returned.Using
worker.secedeperforms two actionslong-runningwhich tells the worker state that this currently executing task should not count towards the limitnthreadsOnly one of these mechanisms is required to fulfill our requirements for
If we look further into how this mechanism can be used incorrectly, we will notice that there is no
worker.rejoinmethod (see #5882) and there is no actual way to re-enter the threadpool (in a safe way) and update the worker state at the same time. Users may resort to using thethreadpoolexecutor.rejoinwhich implements a reentering of the thread in the threadpool but does not implement a reentrance to the worker state by transitioning the task back toexecuting. This causes the threadpool to be already saturated even though the number of tasks in state executing that are actually running on the threadpool is smaller thanWorker.nthreads, i.e. Worker.execute will actually block without submitting any tasks to the executor.This error is actually surprisingly easy to do since we're importing
worker.secedebutthreadpool.rejoinin our top-level__init__.pyA similar problem occurs if users are using [
threadpool.secede] instead ofworker.secede. While this actually frees up a slot on the threadpool, the worker would never transition any more tasks and we wouldn't even increase concurrency, i.e. this would not have any effect. This import is not exposed top-level.The bottom line is that we should definitely not expose
threadpool.secedeorthreadpool.rejointo the public API since the usage is most likely wrong unless people are using our threadpool without using distributed and know exactly what they are doing.At the same time, updating the threadpool to a more recent version has been shown to be difficult (#5893)
If nothing surprising pops up on CI I would suggest to drop this entirely