-
Notifications
You must be signed in to change notification settings - Fork 21
feat: multiprocessing to improve performance when fitting many curves #3607
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| if workers > 1 and len(map_over) > 0: | ||
| max_size = max((da.sizes[dim] for dim in map_over)) | ||
| max_size_dim = next((dim for dim in map_over if da.sizes[dim] == max_size)) | ||
| pardim = max_size_dim if max_size > 1 else None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here the dimension to slice into chunks and parallelize over is decided.
The dimension with the largest size is used because that gives us a more even load on the separate processes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might want to add some kind of heuristic for when to not use multiprocessing and instead rely on the built-in thread parallelism of scipp.
A heuristic that seems reasonable from experiments (on my laptop) is when the size of each fit becomes larger than 5000 points per core. Are there other suggestions for heuristics to use?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update: a heuristic has been added
|
In #3603 (comment) you showed that using multi-processing is slower in some cases, and provides a relatively minor speedup in other cases. Are you doing something different here? Did you get to the bottom of why the speedup is so marginal? Are you avoiding slowdowns? |
Note that in the figure lower y-value (runtime) is better. The benchmark was made on a computer with 4 cores, so a speedup factor of more than 4 is not expected, and that is close to what's in the figure for small When the fit size grows the overhead of It's good to think about the impact in terms of absolute time. For large fit sizes the routine is already fast, so a relative slowdown there is less important than the relative speedup for small fit sizes. (Neither 100k or 1M is particularly large in this context, considering a 1d fit over a 2d grid could easily involve 100M points.) Note that now the benchmark is run on a workstation with 12 cores. 100k points1M pointsIt's clear from the figures that the multiprocessing has an overhead of about ~0.1 seconds, and if the fit is fast that becomes a relative slowdown. In most use cases that's not relevant, but the speedup from seconds to sub-seconds for low |
0648eb4 to
2358443
Compare
|
Hi all, |
|
Thank you for trying it out, it's much appreciated! Please write here with the traceback, or feel free to open an issue, if you encounter the problem again. It's hard to tell what might've gone wrong without more context. But note that the try-except on line 307 only catches the exception raised when |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copilot reviewed 3 out of 14 changed files in this pull request and generated 2 suggestions.
Files not reviewed (11)
- requirements/asv.txt: Language not supported
- requirements/base.in: Language not supported
- requirements/base.txt: Language not supported
- requirements/build.txt: Language not supported
- requirements/ci.txt: Language not supported
- requirements/coverage.txt: Language not supported
- requirements/docs.txt: Language not supported
- requirements/extra.txt: Language not supported
- requirements/mypy.txt: Language not supported
- requirements/static.txt: Language not supported
- requirements/test.txt: Language not supported
Comments skipped due to low confidence (1)
src/scipp/curve_fit.py:245
- [nitpick] The function name _curve_fit_chunk is not very descriptive. It could be renamed to _parallel_curve_fit_chunk to better reflect its purpose in the context of multiprocessing.
def _curve_fit_chunk(
| workers = psutil.cpu_count(logical=False) if workers is None else workers | ||
| try: | ||
| pickle.dumps(f) | ||
| except (AttributeError, pickle.PicklingError) as err: |
Copilot
AI
Nov 20, 2024
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error message 'The provided fit function is not pickleable and can not be used with the multiprocessing module.' could be more specific by including the name of the function that caused the error.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
pyproject.toml
Outdated
| requires-python = ">=3.10" | ||
| dependencies = [ | ||
| "numpy >= 1.20", | ||
| "psutil", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not comfortable adding this as a new required dependency for all of Scipp. Can we do without it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could fallback to using os.process_cpu_count https://docs.python.org/3/library/os.html#os.process_cpu_count if psutil is not installed.
The drawback is that process_cpu_cores returns the number of logical cores but for best performance we want the number of physical cores.
Maybe falling back to os.process_cpu_count//2 is a good solution?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about using the value of the workers argument, then the user can decide?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah maybe that's just easiest. I was thinking it's good to have an option to "use all cores", but probably it's not necessary, especially since we're making the multiprocessing opt-in anyway.
src/scipp/curve_fit.py
Outdated
| def _serialize_variable(v): | ||
| return (v.dims, v.values, v.variances, str(v.unit) if v.unit is not None else None) | ||
|
|
||
|
|
||
| def _serialize_mapping(v): | ||
| return (tuple(v.keys()), tuple(map(_serialize_variable, v.values()))) | ||
|
|
||
|
|
||
| def _serialize_bounds(v): | ||
| return ( | ||
| tuple(v.keys()), | ||
| tuple(tuple(map(_serialize_variable, pair)) for pair in v.values()), | ||
| ) | ||
|
|
||
|
|
||
| def _serialize_data_array(da): | ||
| return ( | ||
| _serialize_variable(da.data), | ||
| _serialize_mapping(da.coords), | ||
| _serialize_mapping(da.masks), | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't this use https://scipp.github.io/generated/modules/scipp.compat.dict.to_dict.html#scipp.compat.dict.to_dict?
Or maybe even https://github.com/scipp/scipp/blob/main/src/scipp/serialization.py, so pickle is not needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A previous version of this PR used save_hdf5 to a bytesIO stream and sent that. But it was slower than the current solution. I guess the hdf5 serialization is expensive?
I'll have a look at compat.to_dict and see if that can be used instead 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using the to_dict method ran into: TypeError: cannot pickle 'scipp._scipp.core.Unit' object and TypeError: cannot pickle 'scipp._scipp.core.DType' object
With some tweaks it would probably be made to work anyway, but I don't think the resulting code wouldn't be less than it is now. The bounds serialization would have to be custom anyway.
src/scipp/curve_fit.py
Outdated
| return left, right | ||
|
|
||
|
|
||
| def _select_data_params_and_bounds(sel, da, p0, bounds): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing type hints.
| map_over, | ||
| unsafe_numpy_f, | ||
| kwargs, | ||
| ): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing type hints.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it really useful to add type hints to helper functions? I typically don't do that.
Or do you mean it would be good to have for this particular function but not for the other helper functions in this module?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Depends. It can help the reader (reviewer) a lot.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added now 👍
tests/curve_fit_test.py
Outdated
| def test_with_nonpickle_function(): | ||
| da = array2d(a=1.2, b=1.3, noise_scale=0.01) | ||
|
|
||
| def local_function(x, a, b): | ||
| return func(x, a, b) | ||
|
|
||
| with pytest.raises(ValueError, match='The provided fit function is not'): | ||
| curve_fit(['x'], local_function, da) | ||
|
|
||
| with pytest.raises(ValueError, match='The provided fit function is not'): | ||
| curve_fit(['x'], lambda x, a, b: func(x, a, b), da) | ||
|
|
||
| # Explicitly disabling multiprocessing lets you use a local function | ||
| curve_fit(['x'], local_function, da, workers=1) | ||
| # Function in __main__ scope works | ||
| curve_fit(['x'], func, da) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure this is good default behavior. We should make this work by default, either by not multiprocessing if it cannot be pickled, or by disabling multi-processing by default, making it opt-in instead of opt-out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Making it opt-in sounds like the best solution.
Enabling multiprocessing by default but silently disabling it if the function cannot be pickled will lead to surprising performance regressions when someone replaces a main scope function by a lambda or something. So in my opinion that's not the best option.
| par, cov = _curve_fit_chunk( | ||
| coords, | ||
| f, | ||
| da, | ||
| p0, | ||
| bounds, | ||
| map_over, | ||
| unsafe_numpy_f, | ||
| kwargs, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should use keyword args here.
src/scipp/curve_fit.py
Outdated
| par, cov = _datagroup_outputs(da, p0, map_over, par, cov) | ||
| return par, cov |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose par and cov have different types before and after. Mypy would not like that, so better to just return without naming the return values of _datagroup_outputs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, before they are Numpy arrays and after they are scipp DataGroup.
just return without naming the return values of _datagroup_outputs
Sounds good!
| max_size = max((da.sizes[dim] for dim in map_over)) | ||
| max_size_dim = next((dim for dim in map_over if da.sizes[dim] == max_size)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remind we why we don't simply flatten the dims being mapped over? Then we would have a single potentially much longer dim to use for parallelization?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand you correctly:
Let's say we have a 2D array with dims x,y where we want to fit f(x, a, b)=a x^2 + b and map over y to obtain the parameters as functions of y: a(y) and b(y).
The suggestion is that we flatten the 2D array to 1D and then fit, but then we only get one pair of parameters a and b. This is what happens if reduce_dims contains y.
So the reason we don't flatten the dims being mapped over is that we want a different set of parameters for each entry of the dim being mapped over.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
|
|
||
| # Only parallelize if the user did not explicitly ask for a single worker | ||
| # and a suitable dimension for parallelization was found. | ||
| if workers != 1 and pardim is not None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Relating to my comment in the tests regarding failing if the function cannot be pickled: This here demonstrates that this is indeed quite problematic. Say someone writes and tests some fitting code on small examples. Then pardim may bypass the pickling error. Then, when put into production we suddenly have pardim != None and the code that worked perfectly in the tests fails.
Can we make the behavior more consistent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a great point. Let's make the multiprocessing opt-in.
tests/curve_fit_test.py
Outdated
| def test_multiprocessing_used_if_several_workers_requested(): | ||
| da = array2d(a=1.2, b=1.3, noise_scale=0.01) | ||
|
|
||
| # The point of this test is to verify that | ||
| # the "workers" argument actually does what we expect. | ||
| start = time.perf_counter() | ||
| curve_fit(['x'], func, da) | ||
| end = time.perf_counter() | ||
| without_mp = end - start | ||
|
|
||
| start = time.perf_counter() | ||
| curve_fit(['x'], func, da, workers=100) | ||
| end = time.perf_counter() | ||
| with_mp = end - start | ||
|
|
||
| # It looks weird that we're asserting it's much slower with multiprocessing. | ||
| # Isn't the point to **improve** performance? | ||
| # But here the array is so small that it doesn't make any sense to parallelize, | ||
| # and we're requesting too many workers, so it should be very slow | ||
| # if the "workers" argument is respected. | ||
| assert with_mp > 10 * without_mp |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this test actually useful? It may break when we want to change an implementation detail, and even now it may be flaky in ways that you have not anticipated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you don't think it's useful I'll remove it.


Fixes #3603
Uses
multiprocessing.poolto parallelize curve fitting when many smaller curves are fitted.Adds an argument for controlling the number of processes to use.