Skip to content

Error raised when caching in parallel and function code has changed #92

@AlexandreAbraham

Description

@AlexandreAbraham

I am using joblib to cache calls in parallel. The same function is called in parallel with different args. The problem is when the code of the function has changed: I think that all processes try to update the file func_code.py, resulting in an error.

Here is the stacktrace:

Traceback (most recent call last):
  File "run.py", line 264, in <module>
    for (subject_series, movement) in zip(series, dataset.movement))
  File "/home/aa013911/epd-7.3-2-rh5-x86_64/lib/python2.7/site-packages/joblib-0.7.1-py2.7.egg/joblib/parallel.py", line 561, in __call__
    self.retrieve()
  File "/home/aa013911/epd-7.3-2-rh5-x86_64/lib/python2.7/site-packages/joblib-0.7.1-py2.7.egg/joblib/parallel.py", line 483, in retrieve
    raise exception_type(report)
joblib.my_exceptions.JoblibIOError/home/aa013911/epd-7.3-2-rh5-x86_64/lib/python2.7/site-packages/joblib-0.7.1-py2.7.egg/joblib/my_exceptions.py:26: DeprecationWarning: BaseException.message has been deprecated as of Python 2.6
  self.message,
: JoblibIOError
___________________________________________________________________________
Multiprocessing exception:
    ...........................................................................
/mnt/neurospin/sel-poivre/tmp/aa013911/GSPCA_mu1.00_l12.50_a0.04/run.py in <module>()
    259         # recomputation when adding a step. Each step is cached independently
    260         # in the function.
    261         print ('\t\tExtracting subject information')
    262         result = Parallel(n_jobs=2)(delayed(runner.process_subject)(
    263             subject_series, regions, gm_index, movement, memory=regions_memory)
--> 264             for (subject_series, movement) in zip(series, dataset.movement))
    265         regions_series, confounds, covariance, precision = \
    266             zip(*result)
    267 
    268         # Explained variance

...........................................................................
/home/aa013911/epd-7.3-2-rh5-x86_64/lib/python2.7/site-packages/joblib-0.7.1-py2.7.egg/joblib/parallel.py in __call__(self=Parallel(n_jobs=2), iterable=<generator object <genexpr>>)
    556         self.n_dispatched = 0
    557         try:
    558             for function, args, kwargs in iterable:
    559                 self.dispatch(function, args, kwargs)
    560 
--> 561             self.retrieve()
        self.retrieve = <bound method Parallel.retrieve of Parallel(n_jobs=2)>
    562             # Make sure that we get a last message telling us we are done
    563             elapsed_time = time.time() - self._start_time
    564             self._print('Done %3i out of %3i | elapsed: %s finished',
    565                         (len(self._output),

    ---------------------------------------------------------------------------
    Sub-process traceback:
    ---------------------------------------------------------------------------
    IOError                                            Tue Dec  3 07:46:50 2013
PID: 15711     Python 2.7.3: /home/aa013911/epd-7.3-2-rh5-x86_64/bin/python
...........................................................................
/home/aa013911/abraham_miccai2013/msdl/runner.pyc in process_subject(series=MemorizedResult(cachedir="../abide/joblib", func...argument_hash="eab6c74fde4a396a99076425288fa670"), regions=memmap([[-0.45718721, -0.47443968, -0.48146474, ...       -0.42739522, -0.41334313]], dtype=float32), gm_index=[False, True, True, False, False, True, True, True, False, False, True, True, True, True, False, False, False, True, True, True, ...], movement='../dataset/ABIDE/Leuven/Leuven_50683/rp_rest.txt', memory=Memory(cachedir='./model3_noop/joblib'))
    116 
    117     confounds = memory.cache(extract_confounds).call_and_shelve(
    118             regions_series, regions, gm_index=gm_index)
    119 
    120     covariance, precision = memory.cache(covariance_matrix)(
--> 121             regions_series, confounds=[movement, confounds])
    122 
    123     return regions_series, confounds, covariance, precision
    124 
    125 

...........................................................................
/home/aa013911/epd-7.3-2-rh5-x86_64/lib/python2.7/site-packages/joblib-0.7.1-py2.7.egg/joblib/memory.pyc in __call__(self=MemorizedFunc(func=<function covariance_matrix at 0xc51a0c8>, cachedir='./model3_noop/joblib'), *args=(MemorizedResult(cachedir="./model3_noop/joblib",...argument_hash="a5abfb42d4bc86e395de839fa65840c9"),), **kwargs={'confounds': ['../dataset/ABIDE/Leuven/Leuven_50683/rp_rest.txt', MemorizedResult(cachedir="./model3_noop/joblib",...argument_hash="e880120cd72429390ae89b5dabba252a")]})
    478         return MemorizedResult(self.cachedir, self.func, argument_hash,
    479             metadata=metadata, verbose=self._verbose - 1,
    480             timestamp=self.timestamp)
    481 
    482     def __call__(self, *args, **kwargs):
--> 483         return self._cached_call(args, kwargs)[0]
    484 
    485     def __reduce__(self):
    486         """ We don't store the timestamp when pickling, to avoid the hash
    487             depending from it.

...........................................................................
/home/aa013911/epd-7.3-2-rh5-x86_64/lib/python2.7/site-packages/joblib-0.7.1-py2.7.egg/joblib/memory.pyc in _cached_call(self=MemorizedFunc(func=<function covariance_matrix at 0xc51a0c8>, cachedir='./model3_noop/joblib'), args=(MemorizedResult(cachedir="./model3_noop/joblib",...argument_hash="a5abfb42d4bc86e395de839fa65840c9"),), kwargs={'confounds': ['../dataset/ABIDE/Leuven/Leuven_50683/rp_rest.txt', MemorizedResult(cachedir="./model3_noop/joblib",...argument_hash="e880120cd72429390ae89b5dabba252a")]})
    425             # Compare the function code with the previous to see if the
    426             # function code has changed
    427             output_dir, argument_hash = self._get_output_dir(*args, **kwargs)
    428         metadata = None
    429         # FIXME: The statements below should be try/excepted
--> 430         if not (self._check_previous_func_code(stacklevel=4) and
        t = undefined
    431                                  os.path.exists(output_dir)):
    432             if self._verbose > 10:
    433                 _, name = get_func_name(self.func)
    434                 self.warn('Computing func %s, argument hash %s in '

...........................................................................
/home/aa013911/epd-7.3-2-rh5-x86_64/lib/python2.7/site-packages/joblib-0.7.1-py2.7.egg/joblib/memory.pyc in _check_previous_func_code(self=MemorizedFunc(func=<function covariance_matrix at 0xc51a0c8>, cachedir='./model3_noop/joblib'), stacklevel=4)
    618         # XXX: Should be using warnings, and giving stacklevel
    619         if self._verbose > 10:
    620             _, func_name = get_func_name(self.func, resolv_alias=False)
    621             self.warn("Function %s (stored in %s) has changed." %
    622                         (func_name, func_dir))
--> 623         self.clear(warn=True)
    624         return False
    625 
    626     def clear(self, warn=True):
    627         """ Empty the function's cache.

...........................................................................
/home/aa013911/epd-7.3-2-rh5-x86_64/lib/python2.7/site-packages/joblib-0.7.1-py2.7.egg/joblib/memory.pyc in clear(self=MemorizedFunc(func=<function covariance_matrix at 0xc51a0c8>, cachedir='./model3_noop/joblib'), warn=True)
    632         if os.path.exists(func_dir):
    633             shutil.rmtree(func_dir, ignore_errors=True)
    634         mkdirp(func_dir)
    635         func_code, _, first_line = get_func_code(self.func)
    636         func_code_file = os.path.join(func_dir, 'func_code.py')
--> 637         self._write_func_code(func_code_file, func_code, first_line)
    638 
    639     def call(self, *args, **kwargs):
    640         """ Force the execution of the function with the given arguments and
    641             persist the output values.

...........................................................................
/home/aa013911/epd-7.3-2-rh5-x86_64/lib/python2.7/site-packages/joblib-0.7.1-py2.7.egg/joblib/memory.pyc in _write_func_code(self=MemorizedFunc(func=<function covariance_matrix at 0xc51a0c8>, cachedir='./model3_noop/joblib'), filename='./model3_noop/joblib/msdl/covariance/covariance_matrix/func_code.py', func_code='# first line: 7\ndef covariance_matrix(series, co...np.eye(series.shape[1]), np.eye(series.shape[1])\n', first_line=7)
    546 
    547     def _write_func_code(self, filename, func_code, first_line):
    548         """ Write the function code and the filename to a file.
    549         """
    550         func_code = '%s %i\n%s' % (FIRST_LINE_TEXT, first_line, func_code)
--> 551         with open(filename, 'w') as out:
    552             out.write(func_code)
    553 
    554     def _check_previous_func_code(self, stacklevel=2):
    555         """

IOError: [Errno 2] No such file or directory: './model3_noop/joblib/msdl/covariance/covariance_matrix/func_code.py'
___________________________________________________________________________

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions