Skip to content

Conversation

@jokasimr
Copy link
Contributor

@jokasimr jokasimr commented Nov 18, 2024

Fixes #3603

Uses multiprocessing.pool to parallelize curve fitting when many smaller curves are fitted.
Adds an argument for controlling the number of processes to use.

if workers > 1 and len(map_over) > 0:
max_size = max((da.sizes[dim] for dim in map_over))
max_size_dim = next((dim for dim in map_over if da.sizes[dim] == max_size))
pardim = max_size_dim if max_size > 1 else None
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here the dimension to slice into chunks and parallelize over is decided.
The dimension with the largest size is used because that gives us a more even load on the separate processes.

Copy link
Contributor Author

@jokasimr jokasimr Nov 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to add some kind of heuristic for when to not use multiprocessing and instead rely on the built-in thread parallelism of scipp.

A heuristic that seems reasonable from experiments (on my laptop) is when the size of each fit becomes larger than 5000 points per core. Are there other suggestions for heuristics to use?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: a heuristic has been added

@SimonHeybrock
Copy link
Member

In #3603 (comment) you showed that using multi-processing is slower in some cases, and provides a relatively minor speedup in other cases. Are you doing something different here? Did you get to the bottom of why the speedup is so marginal? Are you avoiding slowdowns?

@jokasimr
Copy link
Contributor Author

jokasimr commented Nov 18, 2024

you showed that using multi-processing is slower in some cases, and provides a relatively minor speedup in other cases. Are you doing something different here? Did you get to the bottom of why the speedup is so marginal? Are you avoiding slowdowns?

Note that in the figure lower y-value (runtime) is better.

The benchmark was made on a computer with 4 cores, so a speedup factor of more than 4 is not expected, and that is close to what's in the figure for small fit_size.

When the fit size grows the overhead of scipy.curve_fit shrinks and the thread parallelism of scipp becomes more effective, so the multiprocessing becomes less useful (and maybe even harmful because of the serialization cost).

It's good to think about the impact in terms of absolute time. For large fit sizes the routine is already fast, so a relative slowdown there is less important than the relative speedup for small fit sizes.
To illustrate the difference I made two figures displaying absolute runtime as a function of fit_size and number of worker processes. The first one is for a problem with 100 000 points (or, 100 000 / fit_size curves each with fit_size points). The second is for a 10x larger problem with 1000 000 points.

(Neither 100k or 1M is particularly large in this context, considering a 1d fit over a 2d grid could easily involve 100M points.)

Note that now the benchmark is run on a workstation with 12 cores.

100k points

runtime_100_000

1M points

runtime_1000_000

It's clear from the figures that the multiprocessing has an overhead of about ~0.1 seconds, and if the fit is fast that becomes a relative slowdown. In most use cases that's not relevant, but the speedup from seconds to sub-seconds for low fit_size is.

@TheFermiSea
Copy link

Hi all,
I have been testing this new implementation and have noticed that sometimes (only very rarely) the try/except block starting at line 307 kills the whole run instead of just raising the exception (error occurs at 678, but surely has something to do with the try/except). I really cannot figure out why it is not reproducible (rerunning identical code on identical data usually works the second time). It seems that pool.starmap_async allows for callbacks to handle per-worker errors like this, but I think this would be incompatible with the current organization logic. Any suggestions? If I catch the anomalous behavior again, I will post the traceback.

@jokasimr
Copy link
Contributor Author

Thank you for trying it out, it's much appreciated! Please write here with the traceback, or feel free to open an issue, if you encounter the problem again.

It's hard to tell what might've gone wrong without more context. But note that the try-except on line 307 only catches the exception raised when scipy.curve_fit fails to find a good fit, any other exceptions will be re-raised.

@MridulS MridulS requested a review from Copilot November 20, 2024 12:05
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot reviewed 3 out of 14 changed files in this pull request and generated 2 suggestions.

Files not reviewed (11)
  • requirements/asv.txt: Language not supported
  • requirements/base.in: Language not supported
  • requirements/base.txt: Language not supported
  • requirements/build.txt: Language not supported
  • requirements/ci.txt: Language not supported
  • requirements/coverage.txt: Language not supported
  • requirements/docs.txt: Language not supported
  • requirements/extra.txt: Language not supported
  • requirements/mypy.txt: Language not supported
  • requirements/static.txt: Language not supported
  • requirements/test.txt: Language not supported
Comments skipped due to low confidence (1)

src/scipp/curve_fit.py:245

  • [nitpick] The function name _curve_fit_chunk is not very descriptive. It could be renamed to _parallel_curve_fit_chunk to better reflect its purpose in the context of multiprocessing.
def _curve_fit_chunk(

workers = psutil.cpu_count(logical=False) if workers is None else workers
try:
pickle.dumps(f)
except (AttributeError, pickle.PicklingError) as err:
Copy link

Copilot AI Nov 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message 'The provided fit function is not pickleable and can not be used with the multiprocessing module.' could be more specific by including the name of the function that caused the error.

Copilot uses AI. Check for mistakes.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
@jokasimr jokasimr removed the request for review from SimonHeybrock December 5, 2024 13:29
pyproject.toml Outdated
requires-python = ">=3.10"
dependencies = [
"numpy >= 1.20",
"psutil",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not comfortable adding this as a new required dependency for all of Scipp. Can we do without it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could fallback to using os.process_cpu_count https://docs.python.org/3/library/os.html#os.process_cpu_count if psutil is not installed.

The drawback is that process_cpu_cores returns the number of logical cores but for best performance we want the number of physical cores.

Maybe falling back to os.process_cpu_count//2 is a good solution?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using the value of the workers argument, then the user can decide?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah maybe that's just easiest. I was thinking it's good to have an option to "use all cores", but probably it's not necessary, especially since we're making the multiprocessing opt-in anyway.

Comment on lines 196 to 216
def _serialize_variable(v):
return (v.dims, v.values, v.variances, str(v.unit) if v.unit is not None else None)


def _serialize_mapping(v):
return (tuple(v.keys()), tuple(map(_serialize_variable, v.values())))


def _serialize_bounds(v):
return (
tuple(v.keys()),
tuple(tuple(map(_serialize_variable, pair)) for pair in v.values()),
)


def _serialize_data_array(da):
return (
_serialize_variable(da.data),
_serialize_mapping(da.coords),
_serialize_mapping(da.masks),
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A previous version of this PR used save_hdf5 to a bytesIO stream and sent that. But it was slower than the current solution. I guess the hdf5 serialization is expensive?

I'll have a look at compat.to_dict and see if that can be used instead 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the to_dict method ran into: TypeError: cannot pickle 'scipp._scipp.core.Unit' object and TypeError: cannot pickle 'scipp._scipp.core.DType' object

With some tweaks it would probably be made to work anyway, but I don't think the resulting code wouldn't be less than it is now. The bounds serialization would have to be custom anyway.

return left, right


def _select_data_params_and_bounds(sel, da, p0, bounds):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing type hints.

map_over,
unsafe_numpy_f,
kwargs,
):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing type hints.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it really useful to add type hints to helper functions? I typically don't do that.

Or do you mean it would be good to have for this particular function but not for the other helper functions in this module?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Depends. It can help the reader (reviewer) a lot.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added now 👍

Comment on lines 764 to 779
def test_with_nonpickle_function():
da = array2d(a=1.2, b=1.3, noise_scale=0.01)

def local_function(x, a, b):
return func(x, a, b)

with pytest.raises(ValueError, match='The provided fit function is not'):
curve_fit(['x'], local_function, da)

with pytest.raises(ValueError, match='The provided fit function is not'):
curve_fit(['x'], lambda x, a, b: func(x, a, b), da)

# Explicitly disabling multiprocessing lets you use a local function
curve_fit(['x'], local_function, da, workers=1)
# Function in __main__ scope works
curve_fit(['x'], func, da)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure this is good default behavior. We should make this work by default, either by not multiprocessing if it cannot be pickled, or by disabling multi-processing by default, making it opt-in instead of opt-out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making it opt-in sounds like the best solution.

Enabling multiprocessing by default but silently disabling it if the function cannot be pickled will lead to surprising performance regressions when someone replaces a main scope function by a lambda or something. So in my opinion that's not the best option.

Comment on lines 683 to 692
par, cov = _curve_fit_chunk(
coords,
f,
da,
p0,
bounds,
map_over,
unsafe_numpy_f,
kwargs,
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should use keyword args here.

Comment on lines 694 to 695
par, cov = _datagroup_outputs(da, p0, map_over, par, cov)
return par, cov
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose par and cov have different types before and after. Mypy would not like that, so better to just return without naming the return values of _datagroup_outputs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, before they are Numpy arrays and after they are scipp DataGroup.

just return without naming the return values of _datagroup_outputs

Sounds good!

Comment on lines +637 to +638
max_size = max((da.sizes[dim] for dim in map_over))
max_size_dim = next((dim for dim in map_over if da.sizes[dim] == max_size))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remind we why we don't simply flatten the dims being mapped over? Then we would have a single potentially much longer dim to use for parallelization?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand you correctly:

Let's say we have a 2D array with dims x,y where we want to fit f(x, a, b)=a x^2 + b and map over y to obtain the parameters as functions of y: a(y) and b(y).
The suggestion is that we flatten the 2D array to 1D and then fit, but then we only get one pair of parameters a and b. This is what happens if reduce_dims contains y.

So the reason we don't flatten the dims being mapped over is that we want a different set of parameters for each entry of the dim being mapped over.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


# Only parallelize if the user did not explicitly ask for a single worker
# and a suitable dimension for parallelization was found.
if workers != 1 and pardim is not None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Relating to my comment in the tests regarding failing if the function cannot be pickled: This here demonstrates that this is indeed quite problematic. Say someone writes and tests some fitting code on small examples. Then pardim may bypass the pickling error. Then, when put into production we suddenly have pardim != None and the code that worked perfectly in the tests fails.

Can we make the behavior more consistent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a great point. Let's make the multiprocessing opt-in.

@SimonHeybrock SimonHeybrock self-assigned this Dec 20, 2024
Comment on lines 781 to 801
def test_multiprocessing_used_if_several_workers_requested():
da = array2d(a=1.2, b=1.3, noise_scale=0.01)

# The point of this test is to verify that
# the "workers" argument actually does what we expect.
start = time.perf_counter()
curve_fit(['x'], func, da)
end = time.perf_counter()
without_mp = end - start

start = time.perf_counter()
curve_fit(['x'], func, da, workers=100)
end = time.perf_counter()
with_mp = end - start

# It looks weird that we're asserting it's much slower with multiprocessing.
# Isn't the point to **improve** performance?
# But here the array is so small that it doesn't make any sense to parallelize,
# and we're requesting too many workers, so it should be very slow
# if the "workers" argument is respected.
assert with_mp > 10 * without_mp
Copy link
Member

@SimonHeybrock SimonHeybrock Jan 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this test actually useful? It may break when we want to change an implementation detail, and even now it may be flaky in ways that you have not anticipated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't think it's useful I'll remove it.

@jokasimr jokasimr enabled auto-merge (squash) January 16, 2025 09:03
@jokasimr jokasimr merged commit 591c8e2 into main Jan 16, 2025
4 checks passed
@jokasimr jokasimr deleted the par-curve-fit branch January 16, 2025 09:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

Status: Done

Development

Successfully merging this pull request may close these issues.

Multiprocess parallelism in cuve_fit

4 participants