1+ ###############################################################################
2+ # Server process to keep track of unlinked resources (like shared memory
3+ # segments, semaphores etc.) and clean them.
14#
25# On Unix we run a server process which keeps track of unlinked
3- # semaphores . The server ignores SIGINT and SIGTERM and reads from a
6+ # resources . The server ignores SIGINT and SIGTERM and reads from a
47# pipe. Every other process of the program has a copy of the writable
58# end of the pipe, so we get EOF when all other processes have exited.
6- # Then the server process unlinks any remaining semaphore names.
7- #
8- # This is important because the system only supports a limited number
9- # of named semaphores, and they will not be automatically removed till
10- # the next reboot. Without this semaphore tracker process, "killall
11- # python" would probably leave unlinked semaphores.
9+ # Then the server process unlinks any remaining resource names.
1210#
11+ # This is important because there may be system limits for such resources: for
12+ # instance, the system only supports a limited number of named semaphores, and
13+ # shared-memory segments live in the RAM. If a python process leaks such a
14+ # resource, this resource will not be removed till the next reboot. Without
15+ # this resource tracker process, "killall python" would probably leave unlinked
16+ # resources.
1317
1418import os
1519import signal
1620import sys
1721import threading
1822import warnings
1923import _multiprocessing
24+ import _posixshmem
2025
2126from . import spawn
2227from . import util
2631_HAVE_SIGMASK = hasattr (signal , 'pthread_sigmask' )
2732_IGNORED_SIGNALS = (signal .SIGINT , signal .SIGTERM )
2833
34+ _CLEANUP_FUNCS = {
35+ 'noop' : lambda : None ,
36+ 'semaphore' : _multiprocessing .sem_unlink ,
37+ 'shared_memory' : _posixshmem .shm_unlink
38+ }
39+
2940
30- class SemaphoreTracker (object ):
41+ class ResourceTracker (object ):
3142
3243 def __init__ (self ):
3344 self ._lock = threading .Lock ()
@@ -39,13 +50,13 @@ def getfd(self):
3950 return self ._fd
4051
4152 def ensure_running (self ):
42- '''Make sure that semaphore tracker process is running.
53+ '''Make sure that resource tracker process is running.
4354
4455 This can be run from any process. Usually a child process will use
45- the semaphore created by its parent.'''
56+ the resource created by its parent.'''
4657 with self ._lock :
4758 if self ._fd is not None :
48- # semaphore tracker was launched before, is it still running?
59+ # resource tracker was launched before, is it still running?
4960 if self ._check_alive ():
5061 # => still alive
5162 return
@@ -55,24 +66,24 @@ def ensure_running(self):
5566 # Clean-up to avoid dangling processes.
5667 try :
5768 # _pid can be None if this process is a child from another
58- # python process, which has started the semaphore_tracker .
69+ # python process, which has started the resource_tracker .
5970 if self ._pid is not None :
6071 os .waitpid (self ._pid , 0 )
6172 except ChildProcessError :
62- # The semaphore_tracker has already been terminated.
73+ # The resource_tracker has already been terminated.
6374 pass
6475 self ._fd = None
6576 self ._pid = None
6677
67- warnings .warn ('semaphore_tracker : process died unexpectedly, '
68- 'relaunching. Some semaphores might leak.' )
78+ warnings .warn ('resource_tracker : process died unexpectedly, '
79+ 'relaunching. Some resources might leak.' )
6980
7081 fds_to_pass = []
7182 try :
7283 fds_to_pass .append (sys .stderr .fileno ())
7384 except Exception :
7485 pass
75- cmd = 'from multiprocessing.semaphore_tracker import main;main(%d)'
86+ cmd = 'from multiprocessing.resource_tracker import main;main(%d)'
7687 r , w = os .pipe ()
7788 try :
7889 fds_to_pass .append (r )
@@ -107,23 +118,23 @@ def _check_alive(self):
107118 try :
108119 # We cannot use send here as it calls ensure_running, creating
109120 # a cycle.
110- os .write (self ._fd , b'PROBE:0\n ' )
121+ os .write (self ._fd , b'PROBE:0:noop \n ' )
111122 except OSError :
112123 return False
113124 else :
114125 return True
115126
116- def register (self , name ):
117- '''Register name of semaphore with semaphore tracker.'''
118- self ._send ('REGISTER' , name )
127+ def register (self , name , rtype ):
128+ '''Register name of resource with resource tracker.'''
129+ self ._send ('REGISTER' , name , rtype )
119130
120- def unregister (self , name ):
121- '''Unregister name of semaphore with semaphore tracker.'''
122- self ._send ('UNREGISTER' , name )
131+ def unregister (self , name , rtype ):
132+ '''Unregister name of resource with resource tracker.'''
133+ self ._send ('UNREGISTER' , name , rtype )
123134
124- def _send (self , cmd , name ):
135+ def _send (self , cmd , name , rtype ):
125136 self .ensure_running ()
126- msg = '{0}:{1}\n ' .format (cmd , name ).encode ('ascii' )
137+ msg = '{0}:{1}:{2} \n ' .format (cmd , name , rtype ).encode ('ascii' )
127138 if len (name ) > 512 :
128139 # posix guarantees that writes to a pipe of less than PIPE_BUF
129140 # bytes are atomic, and that PIPE_BUF >= 512
@@ -133,14 +144,14 @@ def _send(self, cmd, name):
133144 nbytes , len (msg ))
134145
135146
136- _semaphore_tracker = SemaphoreTracker ()
137- ensure_running = _semaphore_tracker .ensure_running
138- register = _semaphore_tracker .register
139- unregister = _semaphore_tracker .unregister
140- getfd = _semaphore_tracker .getfd
147+ _resource_tracker = ResourceTracker ()
148+ ensure_running = _resource_tracker .ensure_running
149+ register = _resource_tracker .register
150+ unregister = _resource_tracker .unregister
151+ getfd = _resource_tracker .getfd
141152
142153def main (fd ):
143- '''Run semaphore tracker.'''
154+ '''Run resource tracker.'''
144155 # protect the process from ^C and "killall python" etc
145156 signal .signal (signal .SIGINT , signal .SIG_IGN )
146157 signal .signal (signal .SIGTERM , signal .SIG_IGN )
@@ -153,18 +164,24 @@ def main(fd):
153164 except Exception :
154165 pass
155166
156- cache = set ()
167+ cache = { rtype : set () for rtype in _CLEANUP_FUNCS . keys ()}
157168 try :
158- # keep track of registered/unregistered semaphores
169+ # keep track of registered/unregistered resources
159170 with open (fd , 'rb' ) as f :
160171 for line in f :
161172 try :
162- cmd , name = line .strip ().split (b':' )
163- if cmd == b'REGISTER' :
164- cache .add (name )
165- elif cmd == b'UNREGISTER' :
166- cache .remove (name )
167- elif cmd == b'PROBE' :
173+ cmd , name , rtype = line .strip ().decode ('ascii' ).split (':' )
174+ cleanup_func = _CLEANUP_FUNCS .get (rtype , None )
175+ if cleanup_func is None :
176+ raise ValueError (
177+ f'Cannot register { name } for automatic cleanup: '
178+ f'unknown resource type { rtype } ' )
179+
180+ if cmd == 'REGISTER' :
181+ cache [rtype ].add (name )
182+ elif cmd == 'UNREGISTER' :
183+ cache [rtype ].remove (name )
184+ elif cmd == 'PROBE' :
168185 pass
169186 else :
170187 raise RuntimeError ('unrecognized command %r' % cmd )
@@ -174,23 +191,23 @@ def main(fd):
174191 except :
175192 pass
176193 finally :
177- # all processes have terminated; cleanup any remaining semaphores
178- if cache :
179- try :
180- warnings .warn ('semaphore_tracker: There appear to be %d '
181- 'leaked semaphores to clean up at shutdown' %
182- len (cache ))
183- except Exception :
184- pass
185- for name in cache :
186- # For some reason the process which created and registered this
187- # semaphore has failed to unregister it. Presumably it has died.
188- # We therefore unlink it.
189- try :
190- name = name .decode ('ascii' )
194+ # all processes have terminated; cleanup any remaining resources
195+ for rtype , rtype_cache in cache .items ():
196+ if rtype_cache :
191197 try :
192- _multiprocessing .sem_unlink (name )
193- except Exception as e :
194- warnings .warn ('semaphore_tracker: %r: %s' % (name , e ))
195- finally :
196- pass
198+ warnings .warn ('resource_tracker: There appear to be %d '
199+ 'leaked %s objects to clean up at shutdown' %
200+ (len (rtype_cache ), rtype ))
201+ except Exception :
202+ pass
203+ for name in rtype_cache :
204+ # For some reason the process which created and registered this
205+ # resource has failed to unregister it. Presumably it has
206+ # died. We therefore unlink it.
207+ try :
208+ try :
209+ _CLEANUP_FUNCS [rtype ](name )
210+ except Exception as e :
211+ warnings .warn ('resource_tracker: %r: %s' % (name , e ))
212+ finally :
213+ pass
0 commit comments