Skip to content

Allow passing kwds to ProcessPool#252

Merged
mmckerns merged 5 commits into
uqfoundation:masterfrom
ddelange:patch-1
Dec 22, 2022
Merged

Allow passing kwds to ProcessPool#252
mmckerns merged 5 commits into
uqfoundation:masterfrom
ddelange:patch-1

Conversation

@ddelange

Copy link
Copy Markdown
Contributor

Hi 👋

I would like to propagate maxtasksperchild keyword, but for this I had to switch from pathos to multiprocess ref ddelange/mapply#29.

This however decreases stability/cleanup of workers, so rather allow a pathos user to propagate it :)

@ddelange

Copy link
Copy Markdown
Contributor Author

hi @mmckerns 👋 thanks for linking the open issues!

while we're at it, is there currently a way to propagate chunksize? since keyword arguments to imap are swallowed, I'm not sure wether passing it currently will be respected 🤔

@mmckerns

Copy link
Copy Markdown
Member

I hate to admit it... but there's a PR (#198) that's been open for a while on chunksize and I haven't built a test set for it to see what will happen.

@ddelange

Copy link
Copy Markdown
Contributor Author

it would be cool to merge both PRs, but agree that it would be cool to test (for both PRs) that behaviour changes when some kwarg is passed.

apart from tests, is there anything else to do before merging?

@ddelange ddelange force-pushed the patch-1 branch 3 times, most recently from 4a8782a to b6bdb3c Compare September 29, 2022 17:22
@ddelange

Copy link
Copy Markdown
Contributor Author

added test

@ddelange

ddelange commented Oct 5, 2022

Copy link
Copy Markdown
Contributor Author

@mmckerns anything from my side still?

@mmckerns

mmckerns commented Oct 5, 2022

Copy link
Copy Markdown
Member

This is good... it's on my shortlist to test and review.

@mmckerns mmckerns left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of these changes are fine. However, the code in _serve needs work. Essentially, a Pool instance gets cached in __STATE, and will be reused unless you make a change to the Pool configuration. So, if you call Pool(4) then you call Pool(4, maxtasksperchild=2)... then as is, your code won't spawn an new pool with maxtasksperchild=2 (because of line 117). Basically, if the nodes or any kwds change, you need to make sure it instantiates a new Pool.

You should also make the same changes for ThreadPool.

Comment thread pathos/multiprocessing.py Outdated
"""Create a new server if one isn't already initialized"""
if nodes is None: nodes = self.__nodes
_pool = __STATE.get(self._id, None)
if not _pool or nodes != _pool.__nodes:

@mmckerns mmckerns Dec 19, 2022

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs to also check _pool._maxtasksperchild, _pool._initargs, _pool._initializer

@ddelange ddelange Dec 20, 2022

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if not _pool or nodes != _pool.__nodes:
if (
_pool is None
or nodes != _pool.__nodes
or kwds.get('maxtasksperchild') != _pool._maxtasksperchild
or kwds.get('initargs') != _pool._initargs
or kwds.get('initializer') != _pool._initializer
):

like this?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's missing the leading underscores in the latter two.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 edited the comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe you'll also need: (1) corresponding changes to _clear, and (2) when _serve is called by one of the map functions and kwds={}, then it does the expected thing by pulling the kwds from the existing pool.

@ddelange ddelange Dec 21, 2022

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added 5bdd7af which should take care of it: only add a new pool to state if kwds changed, and only clear a pool from state if kwds match. The (2) part I didn't fully get: pool kwds are e.g. initializer, whereas map kwds are e.g. chunksize. I think it should be OK with this last commit, as the is no overlap between those two kinds of kwds?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct, there is no overlap between map and pool kwds.

Comment thread pathos/tests/test_mp.py

# test ProcessPool keyword argument propagation
pool.clear()
pool = ProcessPool(nodes=4, initializer=lambda: time.sleep(0.6))

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pool = ProcessPool(nodes=4, initializer=lambda: time.sleep(0.6))
pool = ProcessPool(nodes=4, initializer=lambda: time.sleep(0.6))
assert pool._pool.initializer, 'Subsequent pool with different kwds should propagate'

right? if not propagated, default initializer should be falsely?

wondering why this test actually takes 0.6+ seconds, doesn't that mean propagation is working? 🤔

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default initializer is None, I believe. You can check the defaults with:

>>> from pathos.pools import _ProcessPool as Pool
>>> p = Pool()
>>> p._initializer

and so on.

@ddelange ddelange Dec 21, 2022

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a state bleed somewhere? because by your theory, my test should be failing. as the test does exactly what you described: first a plain pool, then a pool with initializer. by your theory, that initializer should be ignored and old pool reused. but by the test, the map now takes much longer. so apparently the initializer is propagated?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you've made all the necessary edits. See comments above.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, I'll still make the changes, but I still would like to understand why the test is showing the expected results. L13 populates the state, and L33 calls ProcessPool() again but with sleep initializer. And then time.monotonic shows that the call to map now indeed takes longer due to the sleep. So why does it currently work, if you say it should not work?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was probably clearing the cached pool because _serve was being called with empty kwds in the map call (and the proper state handling wasn't done correctly).

@ddelange ddelange requested a review from mmckerns December 22, 2022 11:57

@mmckerns mmckerns left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Needs documentation, but I'll add that.

@mmckerns mmckerns merged commit dfd15d0 into uqfoundation:master Dec 22, 2022
@mmckerns mmckerns added this to the pathos-0.3.1 milestone Dec 22, 2022
@ddelange

Copy link
Copy Markdown
Contributor Author

super! thanks for the review 👍

@mmckerns

Copy link
Copy Markdown
Member

I also handled if someone passes processes in kwds. Seems to be done now. Thanks for the PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

suggest pathos support initializer parameter for pathos.multiprocessing.ProcessPool Support for maxtasksperchild

2 participants