Skip to content

Add option to specify an initialization function for 'loky' and 'multiprocessing' backends#1525

Merged
tomMoral merged 30 commits intojoblib:mainfrom
shwina:add-initializer-for-loky-and-mp
Apr 11, 2025
Merged

Add option to specify an initialization function for 'loky' and 'multiprocessing' backends#1525
tomMoral merged 30 commits intojoblib:mainfrom
shwina:add-initializer-for-loky-and-mp

Conversation

@shwina
Copy link
Copy Markdown
Contributor

@shwina shwina commented Nov 20, 2023

Closes #381

This PR adds the ability to specify an initialization function that is run once per worker process when using the 'loky' and 'multiprocessing' backends.

Usage:

Parallel(initializer=my_func, initargs=(x, y, z))
# or
with parallel_config(initializer=my_func, initargs=(x, y, z)):
    ...

@shwina
Copy link
Copy Markdown
Contributor Author

shwina commented Feb 6, 2024

Hi @ogrisel - wondering if you have any thoughts on how to test the implementation here correctly?

@bdice
Copy link
Copy Markdown

bdice commented Jun 24, 2024

Hi @ogrisel, this feature request has come up a couple times recently, including in a StackOverflow question: https://stackoverflow.com/questions/78642680/using-load-ext-cudf-pandas-throws-attributeerror

Would it be possible to work with a joblib developer to identify how to move forward with tests?

@SJ-Innovation
Copy link
Copy Markdown

Hi, just wondering is theres been any behind-the-scenes motion on this, as this would be a REALLY handy piece of functionality to have for pre-seeding workers with large, constant, but procedurally generated/loaded datasets at the start of a large number of jobs.

@tomMoral tomMoral marked this pull request as ready for review February 27, 2025 23:37
@codecov
Copy link
Copy Markdown

codecov bot commented Feb 27, 2025

Codecov Report

Attention: Patch coverage is 96.20253% with 3 lines in your changes missing coverage. Please review.

Project coverage is 95.50%. Comparing base (ad13c66) to head (0d9412d).
Report is 23 commits behind head on main.

Files with missing lines Patch % Lines
joblib/_parallel_backends.py 90.90% 1 Missing ⚠️
joblib/parallel.py 85.71% 1 Missing ⚠️
joblib/test/test_parallel.py 98.36% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1525      +/-   ##
==========================================
- Coverage   95.54%   95.50%   -0.04%     
==========================================
  Files          46       46              
  Lines        7741     7838      +97     
==========================================
+ Hits         7396     7486      +90     
- Misses        345      352       +7     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Copy Markdown
Contributor

@ogrisel ogrisel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for finalizing this PR. It appears that we have a problem with what we test, though.

tomMoral and others added 3 commits April 7, 2025 11:40
Co-authored-by: Olivier Grisel <olivier.grisel@ensta.org>
Co-authored-by: Olivier Grisel <olivier.grisel@ensta.org>
Copy link
Copy Markdown
Contributor

@ogrisel ogrisel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Hopefully the new pid-based assert statement will not randomly fail in case some workers take too long to start up compared to others.

If that happen we will have to rewrite those new tests to make them more robust to such race conditions.

@ogrisel
Copy link
Copy Markdown
Contributor

ogrisel commented Apr 7, 2025

Let's wait for the CI to go through and maybe trigger it a few consecutive times before merging.

@tomMoral
Copy link
Copy Markdown
Contributor

tomMoral commented Apr 7, 2025

This does not seem stable because the first worker fetches the second job between the time where the initializer has been run and the time where the second worker would take the job.

We can either say we fix this test to make it more complex and fix the race condition (putting a counter instead of True/False) or we test that the number of pid is large enough.
I fixed it to be more stable I think. I ran the tests three times, hopefully it is good now :)

@lesteve
Copy link
Copy Markdown
Member

lesteve commented Apr 10, 2025

What is the expected behaviour when initializer raises an error? Maybe we can add test for it?

Quickly trying with loky you get TerminatedWorkerError and if you look before the stack-trace you get the stack-trace with the initializer error:

import math
from joblib import Parallel, delayed

def initializer(): 1/0

Parallel(n_jobs=2, initializer=initializer)(delayed(math.sqrt)(i*i) for i in range(10))
Details
Exception in initializer:
Traceback (most recent call last):
  File "/home/lesteve/dev/joblib/joblib/externals/loky/process_executor.py", line 409, in _process_worker
    initializer(*initargs)
  File "<ipython-input-21-a74918bfdf5d>", line 4, in initializer
ZeroDivisionError: division by zero
Exception in initializer:
Traceback (most recent call last):
  File "/home/lesteve/dev/joblib/joblib/externals/loky/process_executor.py", line 409, in _process_worker
    initializer(*initargs)
  File "<ipython-input-21-a74918bfdf5d>", line 4, in initializer
ZeroDivisionError: division by zero
---------------------------------------------------------------------------
TerminatedWorkerError                     Traceback (most recent call last)
Cell In[21], line 6
      2 from joblib import Parallel, delayed
      4 def initializer(): 1/0
----> 6 Parallel(n_jobs=2, initializer=initializer)(delayed(math.sqrt)(i*i) for i in range(10))

File ~/dev/joblib/joblib/parallel.py:2071, in Parallel.__call__(self, iterable)
   2065 # The first item from the output is blank, but it makes the interpreter
   2066 # progress until it enters the Try/Except block of the generator and
   2067 # reaches the first `yield` statement. This starts the asynchronous
   2068 # dispatch of the tasks to the workers.
   2069 next(output)
-> 2071 return output if self.return_generator else list(output)

File ~/dev/joblib/joblib/parallel.py:1681, in Parallel._get_outputs(self, iterator, pre_dispatch)
   1678     yield
   1680     with self._backend.retrieval_context():
-> 1681         yield from self._retrieve()
   1683 except GeneratorExit:
   1684     # The generator has been garbage collected before being fully
   1685     # consumed. This aborts the remaining tasks if possible and warn
   1686     # the user if necessary.
   1687     self._exception = True

File ~/dev/joblib/joblib/parallel.py:1783, in Parallel._retrieve(self)
   1777 while self._wait_retrieval():
   1778     # If the callback thread of a worker has signaled that its task
   1779     # triggered an exception, or if the retrieval loop has raised an
   1780     # exception (e.g. `GeneratorExit`), exit the loop and surface the
   1781     # worker traceback.
   1782     if self._aborting:
-> 1783         self._raise_error_fast()
   1784         break
   1786     nb_jobs = len(self._jobs)

File ~/dev/joblib/joblib/parallel.py:1858, in Parallel._raise_error_fast(self)
   1854 # If this error job exists, immediately raise the error by
   1855 # calling get_result. This job might not exists if abort has been
   1856 # called directly or if the generator is gc'ed.
   1857 if error_job is not None:
-> 1858     error_job.get_result(self.timeout)

File ~/dev/joblib/joblib/parallel.py:757, in BatchCompletionCallBack.get_result(self, timeout)
    751 backend = self.parallel._backend
    753 if backend.supports_retrieve_callback:
    754     # We assume that the result has already been retrieved by the
    755     # callback thread, and is stored internally. It's just waiting to
    756     # be returned.
--> 757     return self._return_or_raise()
    759 # For other backends, the main thread needs to run the retrieval step.
    760 try:

File ~/dev/joblib/joblib/parallel.py:772, in BatchCompletionCallBack._return_or_raise(self)
    770 try:
    771     if self.status == TASK_ERROR:
--> 772         raise self._result
    773     return self._result
    774 finally:

TerminatedWorkerError: A worker process managed by the executor was unexpectedly terminated. This could be caused by a segmentation fault while calling the function or by an excessive memory usage causing the Operating System to kill the worker.

The exit codes of the workers are {EXIT(0)}

With multiprocessing you get an infinite loop of error messages until you do Control-C (the behaviour is similar when using multiprocessing.Pool.map):

import math
from joblib import Parallel, delayed

def initializer(): 1/0

Parallel(n_jobs=2, backend='multiprocessing', initializer=initializer)(delayed(math.sqrt)(i*i) for i in range(10))
Details
Process ForkPoolWorker-16075:
Traceback (most recent call last):
  File "/home/lesteve/micromamba/lib/python3.12/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/home/lesteve/micromamba/lib/python3.12/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/home/lesteve/micromamba/lib/python3.12/multiprocessing/pool.py", line 109, in worker
    initializer(*initargs)
  File "<ipython-input-21-a74918bfdf5d>", line 4, in initializer
    def initializer(): 1/0
                       ~^~
ZeroDivisionError: division by zero
Process ForkPoolWorker-16078:
Traceback (most recent call last):
  File "/home/lesteve/micromamba/lib/python3.12/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/home/lesteve/micromamba/lib/python3.12/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/home/lesteve/micromamba/lib/python3.12/multiprocessing/pool.py", line 109, in worker
    initializer(*initargs)
  File "<ipython-input-21-a74918bfdf5d>", line 4, in initializer
    def initializer(): 1/0
                       ~^~
ZeroDivisionError: division by zero
Process ForkPoolWorker-16076:
Traceback (most recent call last):
  File "/home/lesteve/micromamba/lib/python3.12/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/home/lesteve/micromamba/lib/python3.12/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/home/lesteve/micromamba/lib/python3.12/multiprocessing/pool.py", line 109, in worker
    initializer(*initargs)
  File "<ipython-input-21-a74918bfdf5d>", line 4, in initializer
    def initializer(): 1/0
                       ~^~
ZeroDivisionError: division by zero
Process ForkPoolWorker-16079:
Traceback (most recent call last):
  File "/home/lesteve/micromamba/lib/python3.12/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/home/lesteve/micromamba/lib/python3.12/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/home/lesteve/micromamba/lib/python3.12/multiprocessing/pool.py", line 109, in worker
    initializer(*initargs)
  File "<ipython-input-27-fd537094daee>", line 4, in initializer
    def initializer(): 1/0
                       ~^~
ZeroDivisionError: division by zero
Process ForkPoolWorker-16081:
Traceback (most recent call last):
  File "/home/lesteve/micromamba/lib/python3.12/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/home/lesteve/micromamba/lib/python3.12/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/home/lesteve/micromamba/lib/python3.12/multiprocessing/pool.py", line 109, in worker
    initializer(*initargs)
  File "<ipython-input-27-fd537094daee>", line 4, in initializer
    def initializer(): 1/0
                       ~^~
ZeroDivisionError: division by zero
Process ForkPoolWorker-16080:
Traceback (most recent call last):
  File "/home/lesteve/micromamba/lib/python3.12/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/home/lesteve/micromamba/lib/python3.12/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/home/lesteve/micromamba/lib/python3.12/multiprocessing/pool.py", line 109, in worker
    initializer(*initargs)
  File "<ipython-input-21-a74918bfdf5d>", line 4, in initializer
    def initializer(): 1/0
                       ~^~
ZeroDivisionError: division by zero
Process ForkPoolWorker-16082:
Traceback (most recent call last):
  File "/home/lesteve/micromamba/lib/python3.12/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/home/lesteve/micromamba/lib/python3.12/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/home/lesteve/micromamba/lib/python3.12/multiprocessing/pool.py", line 109, in worker
    initializer(*initargs)
  File "<ipython-input-21-a74918bfdf5d>", line 4, in initializer
    def initializer(): 1/0
                       ~^~
ZeroDivisionError: division by zero
.
.
.

@tomMoral
Copy link
Copy Markdown
Contributor

For the behavior in case of errors, I think this is backend-specific and I would like to say joblib is not responsible 😅
This PR allows passing extra args to the backend, which are mapped to the specifics of the backend, so I would not test more in depth.

I propose to merge the PR as is, and we can always improve testing in a follow up PR if we feel this is necesary.

@tomMoral tomMoral merged commit 3aa56cc into joblib:main Apr 11, 2025
29 checks passed
@tomMoral
Copy link
Copy Markdown
Contributor

Thanks a lot @shwina, for the proposal and for your patience !!!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

No simple way to pass initializer for process

6 participants