Skip to content

[WIP] Update threadpool#5893

Closed
fjetter wants to merge 4 commits intodask:mainfrom
fjetter:update_threadpool
Closed

[WIP] Update threadpool#5893
fjetter wants to merge 4 commits intodask:mainfrom
fjetter:update_threadpool

Conversation

@fjetter
Copy link
Copy Markdown
Member

@fjetter fjetter commented Mar 3, 2022

This updates the threadpool to the most recent version as is currently in python 3.10.2. It required a few compatibility adjustments to make it work for py3.8 but nothing major imo.

Incorporatign secede/rejoin was a bit more tricky and I'm not entirely sure if I didn't put in a bug around the idle counter/semaphore when a thread rejoins. However, if I'm not wrong, a miscounting in this semaphore would only remove the optimization of not spawning additional threads if there are idle workers which I don't consider to be a big deal.

I decided to remove the shutdown timeout that was introduced in #1330

  • The tests never actually worked. They test a lower limit for the shutdown but even if the timeout was removed, all tests would pass. Rather, the tests should've checked an upper threshold
  • I am not entirely convinced this is necessary, if not even harmful. Afaik, the python interpreter will not shut down unless all (non-daemon) threads finished, therefore timing out the shutdown would never close a worker faster in a real application than if we waited for the shutdown to finish properly.
  • In tests, this can overshadow flaws in our tests and will most definitely leak threads that run concurrently to other tests. We should avoid this as good as possible. Worst case would be that the threads never finish and we'd hit the pytest-timeout. I suspect these problems should be straight forward to debug.
  • The original issue mentions a memory leak but I cannot see how this relates to the shutdown.

If any of the above reasons are false or I missed anything, adding the shutdown timeout again should be easy enough

cc @graingert

executor._threads.remove(threading.current_thread())
executor._threads.discard(threading.current_thread())
rejoin_event.set()
executor._idle_semaphore.acquire(timeout=0)
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where I'm not entirely sure if we're doing the right thing. If I do not acquire, we're deadlocking right away. Acquiring too often would cause the counter to underestimate idle threads and spawn more than necessary, therefore it should not be a huge deal

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deadlocks happen in test_rejoin_idempotent. I didn't have the patience to debug this any futher since I think it should not be a big deal.

self._initargs,
),
)
t.daemon = True
Copy link
Copy Markdown
Member

@graingert graingert Mar 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is probably the biggest change in the PR: https://bugs.python.org/issue39812

we might start seeing some processes sticking around waiting for deadlocked executors

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, I missed this one. That explains why the shutdown timeout was "harmless"

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably "require" this to be a daemon. Real world user functions may run for hours but I don't think this should block a worker close indefinitely.
#4726 is also relevant in this context

# We use cpu_count + 4 for both types of tasks.
# But we limit it to 32 to avoid consuming surprisingly large resource
# on many core machine.
max_workers = min(32, (os.cpu_count() or 1) + 4)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is another change I was concerned about, but this is set here

self.executors["default"] = ThreadPoolExecutor(
self.nthreads, thread_name_prefix="Dask-Default-Threads"
)

raise RuntimeError("cannot schedule new futures after shutdown")
if _shutdown:
raise RuntimeError(
"cannot schedule new futures after " "interpreter shutdown"
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/keisheiled/flake8-implicit-str-concat

Suggested change
"cannot schedule new futures after " "interpreter shutdown"
"cannot schedule new futures after interpreter shutdown"

@fjetter
Copy link
Copy Markdown
Member Author

fjetter commented Mar 3, 2022

Well, windows tests all run into hard timeouts so this will be interesting and will likely take a bit of time to sort through.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Mar 3, 2022

Unit Test Results

       4 files   -          8      2 errors    2 suites   - 10   0s ⏱️ - 6h 53m 23s
   511 tests  -   2 110  471 ✔️  -   2 070  29 💤  -   51  11 +11 
1 022 runs   - 14 628  933 ✔️  - 13 854  70 💤  - 793  19 +19 

For more details on these parsing errors and failures, see this check.

Results for commit 4d3b1fb. ± Comparison against base commit 8c98ad8.

@jakirkham
Copy link
Copy Markdown
Member

Wonder if at some point it makes sense to get the changes we need in ThreadPoolExecutor in CPython itself

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants