Skip to content

WIP / RFC Remove custom threadpoolexecutor#6607

Closed
fjetter wants to merge 2 commits intodask:mainfrom
fjetter:remove_custom_threadpool_executor
Closed

WIP / RFC Remove custom threadpoolexecutor#6607
fjetter wants to merge 2 commits intodask:mainfrom
fjetter:remove_custom_threadpool_executor

Conversation

@fjetter
Copy link
Copy Markdown
Member

@fjetter fjetter commented Jun 22, 2022

I believe there is no need to actually have a custom threadpool and the ability to "secede" from the threadpool, i.e. to adjust the allowed thread count while running.

I believe this functionality causes confusion for users and it doesn't work as intended.

Right now we do have two mechanisms in place to control the number of running Tasks. There is the Threadpool limit which is, by default, set to Worker.state.nthread and there is our state machine that controls how many tasks are allowed to be in the state executing, i.e. how many tasks were already submitted to the executor without it being returned.

Using worker.secede performs two actions

  1. Secede from the threadpool, i.e. increase the number of allowed threads in the threadpool by one. In fact, it removes the current thread from the threadpool tracking and likely causes it to leak.
  2. Transition the task to state long-running which tells the worker state that this currently executing task should not count towards the limit nthreads

Only one of these mechanisms is required to fulfill our requirements for

  • Limit concurrently running tasks
  • Allow targeted, manual override of running tasks by seceding the current task

If we look further into how this mechanism can be used incorrectly, we will notice that there is no worker.rejoin method (see #5882) and there is no actual way to re-enter the threadpool (in a safe way) and update the worker state at the same time. Users may resort to using the threadpoolexecutor.rejoin which implements a reentering of the thread in the threadpool but does not implement a reentrance to the worker state by transitioning the task back to executing. This causes the threadpool to be already saturated even though the number of tasks in state executing that are actually running on the threadpool is smaller than Worker.nthreads, i.e. Worker.execute will actually block without submitting any tasks to the executor.

This error is actually surprisingly easy to do since we're importing worker.secede but threadpool.rejoin in our top-level __init__.py

A similar problem occurs if users are using [threadpool.secede] instead of worker.secede. While this actually frees up a slot on the threadpool, the worker would never transition any more tasks and we wouldn't even increase concurrency, i.e. this would not have any effect. This import is not exposed top-level.

The bottom line is that we should definitely not expose threadpool.secede or threadpool.rejoin to the public API since the usage is most likely wrong unless people are using our threadpool without using distributed and know exactly what they are doing.

At the same time, updating the threadpool to a more recent version has been shown to be difficult (#5893)

If nothing surprising pops up on CI I would suggest to drop this entirely

Comment on lines +54 to +67
@contextmanager
def secede():
worker = get_worker()
duration = time() - thread_state.start_time
worker.loop.add_callback(
worker.handle_stimulus,
SecedeEvent(
key=thread_state.key,
compute_duration=duration,
stimulus_id=f"worker-client-secede-{time()}",
),
)

yield
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a duplication of worker.secede (I didn't realize this being there).
I think once #5882 is implemented we should only offer this as a ctx manager

Comment on lines 662 to 665
self.executors["default"] = ThreadPoolExecutor(
nthreads, thread_name_prefix="Dask-Default-Threads"
thread_name_prefix="Dask-Default-Threads"
)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The one problem/shortcoming we'd face when dropping our custom threadpool is that this would inherently limit the number of tasks that are allowed to be seceded to the natural threshold the concurrent.futures.ThreadPoolExecutor sets which is currently min(32, (os.cpu_count() or 1) + 4)
I don't think having a limitation to the number of threads (including seceded) is necessarily a bad thing, though, and we could always make this configurable for power users

@fjetter
Copy link
Copy Markdown
Member Author

fjetter commented Jun 22, 2022

This would conflict with #4726

@github-actions
Copy link
Copy Markdown
Contributor

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       15 files  ±  0         15 suites  ±0   7h 13m 9s ⏱️ - 3m 41s
  2 870 tests  -   9    2 779 ✔️  - 10    84 💤 ±0  6 +1  1 🔥 ±0 
21 267 runs   - 64  20 281 ✔️  - 78  975 💤 +9  9 +4  2 🔥 +1 

For more details on these failures and errors, see this check.

Results for commit 91b41d5. ± Comparison against base commit e6cc40a.

@fjetter
Copy link
Copy Markdown
Member Author

fjetter commented Nov 16, 2022

Another shortcoming of just using the stdlib threadpool is that the stdlib threadpools threads are not deamonic and would therefore block interpreter shutdown. I'm not fully convinced that this is a blocker since in most/all proper deployment situations we'd still have a nanny who could kill the process externally if the worker itself refuses to close

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant