1515# this resource tracker process, "killall python" would probably leave unlinked
1616# resources.
1717
18+ import base64
1819import os
1920import signal
2021import sys
2122import threading
2223import warnings
24+ from collections import deque
25+
26+ import json
2327
2428from . import spawn
2529from . import util
@@ -66,6 +70,14 @@ def __init__(self):
6670 self ._fd = None
6771 self ._pid = None
6872 self ._exitcode = None
73+ self ._reentrant_messages = deque ()
74+
75+ # True to use colon-separated lines, rather than JSON lines,
76+ # for internal communication. (Mainly for testing).
77+ # Filenames not supported by the simple format will always be sent
78+ # using JSON.
79+ # The reader should understand all formats.
80+ self ._use_simple_format = True
6981
7082 def _reentrant_call_error (self ):
7183 # gh-109629: this happens if an explicit call to the ResourceTracker
@@ -102,7 +114,7 @@ def _stop_locked(
102114 # This shouldn't happen (it might when called by a finalizer)
103115 # so we check for it anyway.
104116 if self ._lock ._recursion_count () > 1 :
105- return self ._reentrant_call_error ()
117+ raise self ._reentrant_call_error ()
106118 if self ._fd is None :
107119 # not running
108120 return
@@ -113,7 +125,12 @@ def _stop_locked(
113125 close (self ._fd )
114126 self ._fd = None
115127
116- _ , status = waitpid (self ._pid , 0 )
128+ try :
129+ _ , status = waitpid (self ._pid , 0 )
130+ except ChildProcessError :
131+ self ._pid = None
132+ self ._exitcode = None
133+ return
117134
118135 self ._pid = None
119136
@@ -132,76 +149,119 @@ def ensure_running(self):
132149
133150 This can be run from any process. Usually a child process will use
134151 the resource created by its parent.'''
152+ return self ._ensure_running_and_write ()
153+
154+ def _teardown_dead_process (self ):
155+ os .close (self ._fd )
156+
157+ # Clean-up to avoid dangling processes.
158+ try :
159+ # _pid can be None if this process is a child from another
160+ # python process, which has started the resource_tracker.
161+ if self ._pid is not None :
162+ os .waitpid (self ._pid , 0 )
163+ except ChildProcessError :
164+ # The resource_tracker has already been terminated.
165+ pass
166+ self ._fd = None
167+ self ._pid = None
168+ self ._exitcode = None
169+
170+ warnings .warn ('resource_tracker: process died unexpectedly, '
171+ 'relaunching. Some resources might leak.' )
172+
173+ def _launch (self ):
174+ fds_to_pass = []
175+ try :
176+ fds_to_pass .append (sys .stderr .fileno ())
177+ except Exception :
178+ pass
179+ r , w = os .pipe ()
180+ try :
181+ fds_to_pass .append (r )
182+ # process will out live us, so no need to wait on pid
183+ exe = spawn .get_executable ()
184+ args = [
185+ exe ,
186+ * util ._args_from_interpreter_flags (),
187+ '-c' ,
188+ f'from multiprocessing.resource_tracker import main;main({ r } )' ,
189+ ]
190+ # bpo-33613: Register a signal mask that will block the signals.
191+ # This signal mask will be inherited by the child that is going
192+ # to be spawned and will protect the child from a race condition
193+ # that can make the child die before it registers signal handlers
194+ # for SIGINT and SIGTERM. The mask is unregistered after spawning
195+ # the child.
196+ prev_sigmask = None
197+ try :
198+ if _HAVE_SIGMASK :
199+ prev_sigmask = signal .pthread_sigmask (signal .SIG_BLOCK , _IGNORED_SIGNALS )
200+ pid = util .spawnv_passfds (exe , args , fds_to_pass )
201+ finally :
202+ if prev_sigmask is not None :
203+ signal .pthread_sigmask (signal .SIG_SETMASK , prev_sigmask )
204+ except :
205+ os .close (w )
206+ raise
207+ else :
208+ self ._fd = w
209+ self ._pid = pid
210+ finally :
211+ os .close (r )
212+
213+ def _make_probe_message (self ):
214+ """Return a probe message."""
215+ if self ._use_simple_format :
216+ return b'PROBE:0:noop\n '
217+ return (
218+ json .dumps (
219+ {"cmd" : "PROBE" , "rtype" : "noop" },
220+ ensure_ascii = True ,
221+ separators = ("," , ":" ),
222+ )
223+ + "\n "
224+ ).encode ("ascii" )
225+
226+ def _ensure_running_and_write (self , msg = None ):
135227 with self ._lock :
136228 if self ._lock ._recursion_count () > 1 :
137229 # The code below is certainly not reentrant-safe, so bail out
138- return self ._reentrant_call_error ()
230+ if msg is None :
231+ raise self ._reentrant_call_error ()
232+ return self ._reentrant_messages .append (msg )
233+
139234 if self ._fd is not None :
140235 # resource tracker was launched before, is it still running?
141- if self ._check_alive ():
142- # => still alive
143- return
144- # => dead, launch it again
145- os .close (self ._fd )
146-
147- # Clean-up to avoid dangling processes.
236+ if msg is None :
237+ to_send = self ._make_probe_message ()
238+ else :
239+ to_send = msg
148240 try :
149- # _pid can be None if this process is a child from another
150- # python process, which has started the resource_tracker.
151- if self ._pid is not None :
152- os .waitpid (self ._pid , 0 )
153- except ChildProcessError :
154- # The resource_tracker has already been terminated.
155- pass
156- self ._fd = None
157- self ._pid = None
158- self ._exitcode = None
241+ self ._write (to_send )
242+ except OSError :
243+ self ._teardown_dead_process ()
244+ self ._launch ()
159245
160- warnings .warn ('resource_tracker: process died unexpectedly, '
161- 'relaunching. Some resources might leak.' )
246+ msg = None # message was sent in probe
247+ else :
248+ self ._launch ()
162249
163- fds_to_pass = []
164- try :
165- fds_to_pass .append (sys .stderr .fileno ())
166- except Exception :
167- pass
168- cmd = 'from multiprocessing.resource_tracker import main;main(%d)'
169- r , w = os .pipe ()
250+ while True :
170251 try :
171- fds_to_pass .append (r )
172- # process will out live us, so no need to wait on pid
173- exe = spawn .get_executable ()
174- args = [exe ] + util ._args_from_interpreter_flags ()
175- args += ['-c' , cmd % r ]
176- # bpo-33613: Register a signal mask that will block the signals.
177- # This signal mask will be inherited by the child that is going
178- # to be spawned and will protect the child from a race condition
179- # that can make the child die before it registers signal handlers
180- # for SIGINT and SIGTERM. The mask is unregistered after spawning
181- # the child.
182- prev_sigmask = None
183- try :
184- if _HAVE_SIGMASK :
185- prev_sigmask = signal .pthread_sigmask (signal .SIG_BLOCK , _IGNORED_SIGNALS )
186- pid = util .spawnv_passfds (exe , args , fds_to_pass )
187- finally :
188- if prev_sigmask is not None :
189- signal .pthread_sigmask (signal .SIG_SETMASK , prev_sigmask )
190- except :
191- os .close (w )
192- raise
193- else :
194- self ._fd = w
195- self ._pid = pid
196- finally :
197- os .close (r )
252+ reentrant_msg = self ._reentrant_messages .popleft ()
253+ except IndexError :
254+ break
255+ self ._write (reentrant_msg )
256+ if msg is not None :
257+ self ._write (msg )
198258
199259 def _check_alive (self ):
200260 '''Check that the pipe has not been closed by sending a probe.'''
201261 try :
202262 # We cannot use send here as it calls ensure_running, creating
203263 # a cycle.
204- os .write (self ._fd , b'PROBE:0:noop \n ' )
264+ os .write (self ._fd , self . _make_probe_message () )
205265 except OSError :
206266 return False
207267 else :
@@ -215,27 +275,42 @@ def unregister(self, name, rtype):
215275 '''Unregister name of resource with resource tracker.'''
216276 self ._send ('UNREGISTER' , name , rtype )
217277
218- def _send (self , cmd , name , rtype ):
219- try :
220- self .ensure_running ()
221- except ReentrantCallError :
222- # The code below might or might not work, depending on whether
223- # the resource tracker was already running and still alive.
224- # Better warn the user.
225- # (XXX is warnings.warn itself reentrant-safe? :-)
226- warnings .warn (
227- f"ResourceTracker called reentrantly for resource cleanup, "
228- f"which is unsupported. "
229- f"The { rtype } object { name !r} might leak." )
230- msg = '{0}:{1}:{2}\n ' .format (cmd , name , rtype ).encode ('ascii' )
231- if len (msg ) > 512 :
232- # posix guarantees that writes to a pipe of less than PIPE_BUF
233- # bytes are atomic, and that PIPE_BUF >= 512
234- raise ValueError ('msg too long' )
278+ def _write (self , msg ):
235279 nbytes = os .write (self ._fd , msg )
236- assert nbytes == len (msg ), "nbytes {0:n} but len(msg) {1:n}" .format (
237- nbytes , len (msg ))
280+ assert nbytes == len (msg ), f"{ nbytes = } != { len (msg )= } "
238281
282+ def _send (self , cmd , name , rtype ):
283+ if self ._use_simple_format and '\n ' not in name :
284+ msg = f"{ cmd } :{ name } :{ rtype } \n " .encode ("ascii" )
285+ if len (msg ) > 512 :
286+ # posix guarantees that writes to a pipe of less than PIPE_BUF
287+ # bytes are atomic, and that PIPE_BUF >= 512
288+ raise ValueError ('msg too long' )
289+ self ._ensure_running_and_write (msg )
290+ return
291+
292+ # POSIX guarantees that writes to a pipe of less than PIPE_BUF (512 on Linux)
293+ # bytes are atomic. Therefore, we want the message to be shorter than 512 bytes.
294+ # POSIX shm_open() and sem_open() require the name, including its leading slash,
295+ # to be at most NAME_MAX bytes (255 on Linux)
296+ # With json.dump(..., ensure_ascii=True) every non-ASCII byte becomes a 6-char
297+ # escape like \uDC80.
298+ # As we want the overall message to be kept atomic and therefore smaller than 512,
299+ # we encode encode the raw name bytes with URL-safe Base64 - so a 255 long name
300+ # will not exceed 340 bytes.
301+ b = name .encode ('utf-8' , 'surrogateescape' )
302+ if len (b ) > 255 :
303+ raise ValueError ('shared memory name too long (max 255 bytes)' )
304+ b64 = base64 .urlsafe_b64encode (b ).decode ('ascii' )
305+
306+ payload = {"cmd" : cmd , "rtype" : rtype , "base64_name" : b64 }
307+ msg = (json .dumps (payload , ensure_ascii = True , separators = ("," , ":" )) + "\n " ).encode ("ascii" )
308+
309+ # The entire JSON message is guaranteed < PIPE_BUF (512 bytes) by construction.
310+ assert len (msg ) <= 512 , f"internal error: message too long ({ len (msg )} bytes)"
311+ assert msg .startswith (b'{' )
312+
313+ self ._ensure_running_and_write (msg )
239314
240315_resource_tracker = ResourceTracker ()
241316ensure_running = _resource_tracker .ensure_running
@@ -244,6 +319,30 @@ def _send(self, cmd, name, rtype):
244319getfd = _resource_tracker .getfd
245320
246321
322+ def _decode_message (line ):
323+ if line .startswith (b'{' ):
324+ try :
325+ obj = json .loads (line .decode ('ascii' ))
326+ except Exception as e :
327+ raise ValueError ("malformed resource_tracker message: %r" % (line ,)) from e
328+
329+ cmd = obj ["cmd" ]
330+ rtype = obj ["rtype" ]
331+ b64 = obj .get ("base64_name" , "" )
332+
333+ if not isinstance (cmd , str ) or not isinstance (rtype , str ) or not isinstance (b64 , str ):
334+ raise ValueError ("malformed resource_tracker fields: %r" % (obj ,))
335+
336+ try :
337+ name = base64 .urlsafe_b64decode (b64 ).decode ('utf-8' , 'surrogateescape' )
338+ except ValueError as e :
339+ raise ValueError ("malformed resource_tracker base64_name: %r" % (b64 ,)) from e
340+ else :
341+ cmd , rest = line .strip ().decode ('ascii' ).split (':' , maxsplit = 1 )
342+ name , rtype = rest .rsplit (':' , maxsplit = 1 )
343+ return cmd , rtype , name
344+
345+
247346def main (fd ):
248347 '''Run resource tracker.'''
249348 # protect the process from ^C and "killall python" etc
@@ -266,7 +365,7 @@ def main(fd):
266365 with open (fd , 'rb' ) as f :
267366 for line in f :
268367 try :
269- cmd , name , rtype = line . strip (). decode ( 'ascii' ). split ( ':' )
368+ cmd , rtype , name = _decode_message ( line )
270369 cleanup_func = _CLEANUP_FUNCS .get (rtype , None )
271370 if cleanup_func is None :
272371 raise ValueError (
0 commit comments