Allow passing kwds to ProcessPool#252
Conversation
|
I hate to admit it... but there's a PR (#198) that's been open for a while on |
|
it would be cool to merge both PRs, but agree that it would be cool to test (for both PRs) that behaviour changes when some kwarg is passed. apart from tests, is there anything else to do before merging? |
4a8782a to
b6bdb3c
Compare
|
added test |
|
@mmckerns anything from my side still? |
|
This is good... it's on my shortlist to test and review. |
There was a problem hiding this comment.
Most of these changes are fine. However, the code in _serve needs work. Essentially, a Pool instance gets cached in __STATE, and will be reused unless you make a change to the Pool configuration. So, if you call Pool(4) then you call Pool(4, maxtasksperchild=2)... then as is, your code won't spawn an new pool with maxtasksperchild=2 (because of line 117). Basically, if the nodes or any kwds change, you need to make sure it instantiates a new Pool.
You should also make the same changes for ThreadPool.
| """Create a new server if one isn't already initialized""" | ||
| if nodes is None: nodes = self.__nodes | ||
| _pool = __STATE.get(self._id, None) | ||
| if not _pool or nodes != _pool.__nodes: |
There was a problem hiding this comment.
needs to also check _pool._maxtasksperchild, _pool._initargs, _pool._initializer
There was a problem hiding this comment.
| if not _pool or nodes != _pool.__nodes: | |
| if ( | |
| _pool is None | |
| or nodes != _pool.__nodes | |
| or kwds.get('maxtasksperchild') != _pool._maxtasksperchild | |
| or kwds.get('initargs') != _pool._initargs | |
| or kwds.get('initializer') != _pool._initializer | |
| ): |
like this?
There was a problem hiding this comment.
it's missing the leading underscores in the latter two.
There was a problem hiding this comment.
👍 edited the comment
There was a problem hiding this comment.
I believe you'll also need: (1) corresponding changes to _clear, and (2) when _serve is called by one of the map functions and kwds={}, then it does the expected thing by pulling the kwds from the existing pool.
There was a problem hiding this comment.
Added 5bdd7af which should take care of it: only add a new pool to state if kwds changed, and only clear a pool from state if kwds match. The (2) part I didn't fully get: pool kwds are e.g. initializer, whereas map kwds are e.g. chunksize. I think it should be OK with this last commit, as the is no overlap between those two kinds of kwds?
There was a problem hiding this comment.
correct, there is no overlap between map and pool kwds.
|
|
||
| # test ProcessPool keyword argument propagation | ||
| pool.clear() | ||
| pool = ProcessPool(nodes=4, initializer=lambda: time.sleep(0.6)) |
There was a problem hiding this comment.
| pool = ProcessPool(nodes=4, initializer=lambda: time.sleep(0.6)) | |
| pool = ProcessPool(nodes=4, initializer=lambda: time.sleep(0.6)) | |
| assert pool._pool.initializer, 'Subsequent pool with different kwds should propagate' |
right? if not propagated, default initializer should be falsely?
wondering why this test actually takes 0.6+ seconds, doesn't that mean propagation is working? 🤔
There was a problem hiding this comment.
Default initializer is None, I believe. You can check the defaults with:
>>> from pathos.pools import _ProcessPool as Pool
>>> p = Pool()
>>> p._initializer
and so on.
There was a problem hiding this comment.
is there a state bleed somewhere? because by your theory, my test should be failing. as the test does exactly what you described: first a plain pool, then a pool with initializer. by your theory, that initializer should be ignored and old pool reused. but by the test, the map now takes much longer. so apparently the initializer is propagated?
There was a problem hiding this comment.
I don't think you've made all the necessary edits. See comments above.
There was a problem hiding this comment.
sure, I'll still make the changes, but I still would like to understand why the test is showing the expected results. L13 populates the state, and L33 calls ProcessPool() again but with sleep initializer. And then time.monotonic shows that the call to map now indeed takes longer due to the sleep. So why does it currently work, if you say it should not work?
There was a problem hiding this comment.
It was probably clearing the cached pool because _serve was being called with empty kwds in the map call (and the proper state handling wasn't done correctly).
mmckerns
left a comment
There was a problem hiding this comment.
LGTM. Needs documentation, but I'll add that.
|
super! thanks for the review 👍 |
|
I also handled if someone passes |
Hi 👋
I would like to propagate
maxtasksperchildkeyword, but for this I had to switch frompathostomultiprocessref ddelange/mapply#29.This however decreases stability/cleanup of workers, so rather allow a pathos user to propagate it :)