Skip to content

Commit 406376c

Browse files
committed
upgrade multiprocessing from 3.13.11
1 parent f306915 commit 406376c

File tree

3 files changed

+257
-79
lines changed

3 files changed

+257
-79
lines changed

Lib/multiprocessing/connection.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ def arbitrary_address(family):
7474
if family == 'AF_INET':
7575
return ('localhost', 0)
7676
elif family == 'AF_UNIX':
77-
return tempfile.mktemp(prefix='listener-', dir=util.get_temp_dir())
77+
return tempfile.mktemp(prefix='sock-', dir=util.get_temp_dir())
7878
elif family == 'AF_PIPE':
7979
return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' %
8080
(os.getpid(), next(_mmap_counter)), dir="")

Lib/multiprocessing/resource_tracker.py

Lines changed: 176 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,15 @@
1515
# this resource tracker process, "killall python" would probably leave unlinked
1616
# resources.
1717

18+
import base64
1819
import os
1920
import signal
2021
import sys
2122
import threading
2223
import warnings
24+
from collections import deque
25+
26+
import json
2327

2428
from . import spawn
2529
from . import util
@@ -66,6 +70,14 @@ def __init__(self):
6670
self._fd = None
6771
self._pid = None
6872
self._exitcode = None
73+
self._reentrant_messages = deque()
74+
75+
# True to use colon-separated lines, rather than JSON lines,
76+
# for internal communication. (Mainly for testing).
77+
# Filenames not supported by the simple format will always be sent
78+
# using JSON.
79+
# The reader should understand all formats.
80+
self._use_simple_format = True
6981

7082
def _reentrant_call_error(self):
7183
# gh-109629: this happens if an explicit call to the ResourceTracker
@@ -102,7 +114,7 @@ def _stop_locked(
102114
# This shouldn't happen (it might when called by a finalizer)
103115
# so we check for it anyway.
104116
if self._lock._recursion_count() > 1:
105-
return self._reentrant_call_error()
117+
raise self._reentrant_call_error()
106118
if self._fd is None:
107119
# not running
108120
return
@@ -113,7 +125,12 @@ def _stop_locked(
113125
close(self._fd)
114126
self._fd = None
115127

116-
_, status = waitpid(self._pid, 0)
128+
try:
129+
_, status = waitpid(self._pid, 0)
130+
except ChildProcessError:
131+
self._pid = None
132+
self._exitcode = None
133+
return
117134

118135
self._pid = None
119136

@@ -132,76 +149,119 @@ def ensure_running(self):
132149
133150
This can be run from any process. Usually a child process will use
134151
the resource created by its parent.'''
152+
return self._ensure_running_and_write()
153+
154+
def _teardown_dead_process(self):
155+
os.close(self._fd)
156+
157+
# Clean-up to avoid dangling processes.
158+
try:
159+
# _pid can be None if this process is a child from another
160+
# python process, which has started the resource_tracker.
161+
if self._pid is not None:
162+
os.waitpid(self._pid, 0)
163+
except ChildProcessError:
164+
# The resource_tracker has already been terminated.
165+
pass
166+
self._fd = None
167+
self._pid = None
168+
self._exitcode = None
169+
170+
warnings.warn('resource_tracker: process died unexpectedly, '
171+
'relaunching. Some resources might leak.')
172+
173+
def _launch(self):
174+
fds_to_pass = []
175+
try:
176+
fds_to_pass.append(sys.stderr.fileno())
177+
except Exception:
178+
pass
179+
r, w = os.pipe()
180+
try:
181+
fds_to_pass.append(r)
182+
# process will out live us, so no need to wait on pid
183+
exe = spawn.get_executable()
184+
args = [
185+
exe,
186+
*util._args_from_interpreter_flags(),
187+
'-c',
188+
f'from multiprocessing.resource_tracker import main;main({r})',
189+
]
190+
# bpo-33613: Register a signal mask that will block the signals.
191+
# This signal mask will be inherited by the child that is going
192+
# to be spawned and will protect the child from a race condition
193+
# that can make the child die before it registers signal handlers
194+
# for SIGINT and SIGTERM. The mask is unregistered after spawning
195+
# the child.
196+
prev_sigmask = None
197+
try:
198+
if _HAVE_SIGMASK:
199+
prev_sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, _IGNORED_SIGNALS)
200+
pid = util.spawnv_passfds(exe, args, fds_to_pass)
201+
finally:
202+
if prev_sigmask is not None:
203+
signal.pthread_sigmask(signal.SIG_SETMASK, prev_sigmask)
204+
except:
205+
os.close(w)
206+
raise
207+
else:
208+
self._fd = w
209+
self._pid = pid
210+
finally:
211+
os.close(r)
212+
213+
def _make_probe_message(self):
214+
"""Return a probe message."""
215+
if self._use_simple_format:
216+
return b'PROBE:0:noop\n'
217+
return (
218+
json.dumps(
219+
{"cmd": "PROBE", "rtype": "noop"},
220+
ensure_ascii=True,
221+
separators=(",", ":"),
222+
)
223+
+ "\n"
224+
).encode("ascii")
225+
226+
def _ensure_running_and_write(self, msg=None):
135227
with self._lock:
136228
if self._lock._recursion_count() > 1:
137229
# The code below is certainly not reentrant-safe, so bail out
138-
return self._reentrant_call_error()
230+
if msg is None:
231+
raise self._reentrant_call_error()
232+
return self._reentrant_messages.append(msg)
233+
139234
if self._fd is not None:
140235
# resource tracker was launched before, is it still running?
141-
if self._check_alive():
142-
# => still alive
143-
return
144-
# => dead, launch it again
145-
os.close(self._fd)
146-
147-
# Clean-up to avoid dangling processes.
236+
if msg is None:
237+
to_send = self._make_probe_message()
238+
else:
239+
to_send = msg
148240
try:
149-
# _pid can be None if this process is a child from another
150-
# python process, which has started the resource_tracker.
151-
if self._pid is not None:
152-
os.waitpid(self._pid, 0)
153-
except ChildProcessError:
154-
# The resource_tracker has already been terminated.
155-
pass
156-
self._fd = None
157-
self._pid = None
158-
self._exitcode = None
241+
self._write(to_send)
242+
except OSError:
243+
self._teardown_dead_process()
244+
self._launch()
159245

160-
warnings.warn('resource_tracker: process died unexpectedly, '
161-
'relaunching. Some resources might leak.')
246+
msg = None # message was sent in probe
247+
else:
248+
self._launch()
162249

163-
fds_to_pass = []
164-
try:
165-
fds_to_pass.append(sys.stderr.fileno())
166-
except Exception:
167-
pass
168-
cmd = 'from multiprocessing.resource_tracker import main;main(%d)'
169-
r, w = os.pipe()
250+
while True:
170251
try:
171-
fds_to_pass.append(r)
172-
# process will out live us, so no need to wait on pid
173-
exe = spawn.get_executable()
174-
args = [exe] + util._args_from_interpreter_flags()
175-
args += ['-c', cmd % r]
176-
# bpo-33613: Register a signal mask that will block the signals.
177-
# This signal mask will be inherited by the child that is going
178-
# to be spawned and will protect the child from a race condition
179-
# that can make the child die before it registers signal handlers
180-
# for SIGINT and SIGTERM. The mask is unregistered after spawning
181-
# the child.
182-
prev_sigmask = None
183-
try:
184-
if _HAVE_SIGMASK:
185-
prev_sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, _IGNORED_SIGNALS)
186-
pid = util.spawnv_passfds(exe, args, fds_to_pass)
187-
finally:
188-
if prev_sigmask is not None:
189-
signal.pthread_sigmask(signal.SIG_SETMASK, prev_sigmask)
190-
except:
191-
os.close(w)
192-
raise
193-
else:
194-
self._fd = w
195-
self._pid = pid
196-
finally:
197-
os.close(r)
252+
reentrant_msg = self._reentrant_messages.popleft()
253+
except IndexError:
254+
break
255+
self._write(reentrant_msg)
256+
if msg is not None:
257+
self._write(msg)
198258

199259
def _check_alive(self):
200260
'''Check that the pipe has not been closed by sending a probe.'''
201261
try:
202262
# We cannot use send here as it calls ensure_running, creating
203263
# a cycle.
204-
os.write(self._fd, b'PROBE:0:noop\n')
264+
os.write(self._fd, self._make_probe_message())
205265
except OSError:
206266
return False
207267
else:
@@ -215,27 +275,42 @@ def unregister(self, name, rtype):
215275
'''Unregister name of resource with resource tracker.'''
216276
self._send('UNREGISTER', name, rtype)
217277

218-
def _send(self, cmd, name, rtype):
219-
try:
220-
self.ensure_running()
221-
except ReentrantCallError:
222-
# The code below might or might not work, depending on whether
223-
# the resource tracker was already running and still alive.
224-
# Better warn the user.
225-
# (XXX is warnings.warn itself reentrant-safe? :-)
226-
warnings.warn(
227-
f"ResourceTracker called reentrantly for resource cleanup, "
228-
f"which is unsupported. "
229-
f"The {rtype} object {name!r} might leak.")
230-
msg = '{0}:{1}:{2}\n'.format(cmd, name, rtype).encode('ascii')
231-
if len(msg) > 512:
232-
# posix guarantees that writes to a pipe of less than PIPE_BUF
233-
# bytes are atomic, and that PIPE_BUF >= 512
234-
raise ValueError('msg too long')
278+
def _write(self, msg):
235279
nbytes = os.write(self._fd, msg)
236-
assert nbytes == len(msg), "nbytes {0:n} but len(msg) {1:n}".format(
237-
nbytes, len(msg))
280+
assert nbytes == len(msg), f"{nbytes=} != {len(msg)=}"
238281

282+
def _send(self, cmd, name, rtype):
283+
if self._use_simple_format and '\n' not in name:
284+
msg = f"{cmd}:{name}:{rtype}\n".encode("ascii")
285+
if len(msg) > 512:
286+
# posix guarantees that writes to a pipe of less than PIPE_BUF
287+
# bytes are atomic, and that PIPE_BUF >= 512
288+
raise ValueError('msg too long')
289+
self._ensure_running_and_write(msg)
290+
return
291+
292+
# POSIX guarantees that writes to a pipe of less than PIPE_BUF (512 on Linux)
293+
# bytes are atomic. Therefore, we want the message to be shorter than 512 bytes.
294+
# POSIX shm_open() and sem_open() require the name, including its leading slash,
295+
# to be at most NAME_MAX bytes (255 on Linux)
296+
# With json.dump(..., ensure_ascii=True) every non-ASCII byte becomes a 6-char
297+
# escape like \uDC80.
298+
# As we want the overall message to be kept atomic and therefore smaller than 512,
299+
# we encode encode the raw name bytes with URL-safe Base64 - so a 255 long name
300+
# will not exceed 340 bytes.
301+
b = name.encode('utf-8', 'surrogateescape')
302+
if len(b) > 255:
303+
raise ValueError('shared memory name too long (max 255 bytes)')
304+
b64 = base64.urlsafe_b64encode(b).decode('ascii')
305+
306+
payload = {"cmd": cmd, "rtype": rtype, "base64_name": b64}
307+
msg = (json.dumps(payload, ensure_ascii=True, separators=(",", ":")) + "\n").encode("ascii")
308+
309+
# The entire JSON message is guaranteed < PIPE_BUF (512 bytes) by construction.
310+
assert len(msg) <= 512, f"internal error: message too long ({len(msg)} bytes)"
311+
assert msg.startswith(b'{')
312+
313+
self._ensure_running_and_write(msg)
239314

240315
_resource_tracker = ResourceTracker()
241316
ensure_running = _resource_tracker.ensure_running
@@ -244,6 +319,30 @@ def _send(self, cmd, name, rtype):
244319
getfd = _resource_tracker.getfd
245320

246321

322+
def _decode_message(line):
323+
if line.startswith(b'{'):
324+
try:
325+
obj = json.loads(line.decode('ascii'))
326+
except Exception as e:
327+
raise ValueError("malformed resource_tracker message: %r" % (line,)) from e
328+
329+
cmd = obj["cmd"]
330+
rtype = obj["rtype"]
331+
b64 = obj.get("base64_name", "")
332+
333+
if not isinstance(cmd, str) or not isinstance(rtype, str) or not isinstance(b64, str):
334+
raise ValueError("malformed resource_tracker fields: %r" % (obj,))
335+
336+
try:
337+
name = base64.urlsafe_b64decode(b64).decode('utf-8', 'surrogateescape')
338+
except ValueError as e:
339+
raise ValueError("malformed resource_tracker base64_name: %r" % (b64,)) from e
340+
else:
341+
cmd, rest = line.strip().decode('ascii').split(':', maxsplit=1)
342+
name, rtype = rest.rsplit(':', maxsplit=1)
343+
return cmd, rtype, name
344+
345+
247346
def main(fd):
248347
'''Run resource tracker.'''
249348
# protect the process from ^C and "killall python" etc
@@ -266,7 +365,7 @@ def main(fd):
266365
with open(fd, 'rb') as f:
267366
for line in f:
268367
try:
269-
cmd, name, rtype = line.strip().decode('ascii').split(':')
368+
cmd, rtype, name = _decode_message(line)
270369
cleanup_func = _CLEANUP_FUNCS.get(rtype, None)
271370
if cleanup_func is None:
272371
raise ValueError(

0 commit comments

Comments
 (0)