Skip to content

Commit 15526f5

Browse files
pitrougiampaolo
andauthored
[3.7] bpo-35917: Test multiprocessing manager classes and shareable types (GH-11772) (GH-11780)
multiprocessing: provide unittests for manager classes and shareable types. (cherry picked from commit 2848d9d) Co-authored-by: Giampaolo Rodola <g.rodola@gmail.com>
1 parent 27f6e94 commit 15526f5

File tree

2 files changed

+239
-0
lines changed

2 files changed

+239
-0
lines changed

Lib/test/_test_multiprocessing.py

Lines changed: 236 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4575,6 +4575,242 @@ def test_empty(self):
45754575

45764576
proc.join()
45774577

4578+
4579+
class TestSyncManagerTypes(unittest.TestCase):
4580+
"""Test all the types which can be shared between a parent and a
4581+
child process by using a manager which acts as an intermediary
4582+
between them.
4583+
4584+
In the following unit-tests the base type is created in the parent
4585+
process, the @classmethod represents the worker process and the
4586+
shared object is readable and editable between the two.
4587+
4588+
# The child.
4589+
@classmethod
4590+
def _test_list(cls, obj):
4591+
assert obj[0] == 5
4592+
assert obj.append(6)
4593+
4594+
# The parent.
4595+
def test_list(self):
4596+
o = self.manager.list()
4597+
o.append(5)
4598+
self.run_worker(self._test_list, o)
4599+
assert o[1] == 6
4600+
"""
4601+
manager_class = multiprocessing.managers.SyncManager
4602+
4603+
def setUp(self):
4604+
self.manager = self.manager_class()
4605+
self.manager.start()
4606+
self.proc = None
4607+
4608+
def tearDown(self):
4609+
if self.proc is not None and self.proc.is_alive():
4610+
self.proc.terminate()
4611+
self.proc.join()
4612+
self.manager.shutdown()
4613+
4614+
@classmethod
4615+
def setUpClass(cls):
4616+
support.reap_children()
4617+
4618+
tearDownClass = setUpClass
4619+
4620+
def wait_proc_exit(self):
4621+
# Only the manager process should be returned by active_children()
4622+
# but this can take a bit on slow machines, so wait a few seconds
4623+
# if there are other children too (see #17395).
4624+
join_process(self.proc)
4625+
start_time = time.monotonic()
4626+
t = 0.01
4627+
while len(multiprocessing.active_children()) > 1:
4628+
time.sleep(t)
4629+
t *= 2
4630+
dt = time.monotonic() - start_time
4631+
if dt >= 5.0:
4632+
test.support.environment_altered = True
4633+
print("Warning -- multiprocessing.Manager still has %s active "
4634+
"children after %s seconds"
4635+
% (multiprocessing.active_children(), dt),
4636+
file=sys.stderr)
4637+
break
4638+
4639+
def run_worker(self, worker, obj):
4640+
self.proc = multiprocessing.Process(target=worker, args=(obj, ))
4641+
self.proc.daemon = True
4642+
self.proc.start()
4643+
self.wait_proc_exit()
4644+
self.assertEqual(self.proc.exitcode, 0)
4645+
4646+
@classmethod
4647+
def _test_queue(cls, obj):
4648+
assert obj.qsize() == 2
4649+
assert obj.full()
4650+
assert not obj.empty()
4651+
assert obj.get() == 5
4652+
assert not obj.empty()
4653+
assert obj.get() == 6
4654+
assert obj.empty()
4655+
4656+
def test_queue(self, qname="Queue"):
4657+
o = getattr(self.manager, qname)(2)
4658+
o.put(5)
4659+
o.put(6)
4660+
self.run_worker(self._test_queue, o)
4661+
assert o.empty()
4662+
assert not o.full()
4663+
4664+
def test_joinable_queue(self):
4665+
self.test_queue("JoinableQueue")
4666+
4667+
@classmethod
4668+
def _test_event(cls, obj):
4669+
assert obj.is_set()
4670+
obj.wait()
4671+
obj.clear()
4672+
obj.wait(0.001)
4673+
4674+
def test_event(self):
4675+
o = self.manager.Event()
4676+
o.set()
4677+
self.run_worker(self._test_event, o)
4678+
assert not o.is_set()
4679+
o.wait(0.001)
4680+
4681+
@classmethod
4682+
def _test_lock(cls, obj):
4683+
obj.acquire()
4684+
4685+
def test_lock(self, lname="Lock"):
4686+
o = getattr(self.manager, lname)()
4687+
self.run_worker(self._test_lock, o)
4688+
o.release()
4689+
self.assertRaises(RuntimeError, o.release) # already released
4690+
4691+
@classmethod
4692+
def _test_rlock(cls, obj):
4693+
obj.acquire()
4694+
obj.release()
4695+
4696+
def test_rlock(self, lname="Lock"):
4697+
o = getattr(self.manager, lname)()
4698+
self.run_worker(self._test_rlock, o)
4699+
4700+
@classmethod
4701+
def _test_semaphore(cls, obj):
4702+
obj.acquire()
4703+
4704+
def test_semaphore(self, sname="Semaphore"):
4705+
o = getattr(self.manager, sname)()
4706+
self.run_worker(self._test_semaphore, o)
4707+
o.release()
4708+
4709+
def test_bounded_semaphore(self):
4710+
self.test_semaphore(sname="BoundedSemaphore")
4711+
4712+
@classmethod
4713+
def _test_condition(cls, obj):
4714+
obj.acquire()
4715+
obj.release()
4716+
4717+
def test_condition(self):
4718+
o = self.manager.Condition()
4719+
self.run_worker(self._test_condition, o)
4720+
4721+
@classmethod
4722+
def _test_barrier(cls, obj):
4723+
assert obj.parties == 5
4724+
obj.reset()
4725+
4726+
def test_barrier(self):
4727+
o = self.manager.Barrier(5)
4728+
self.run_worker(self._test_barrier, o)
4729+
4730+
@classmethod
4731+
def _test_pool(cls, obj):
4732+
# TODO: fix https://bugs.python.org/issue35919
4733+
with obj:
4734+
pass
4735+
4736+
def test_pool(self):
4737+
o = self.manager.Pool(processes=4)
4738+
self.run_worker(self._test_pool, o)
4739+
4740+
@classmethod
4741+
def _test_list(cls, obj):
4742+
assert obj[0] == 5
4743+
assert obj.count(5) == 1
4744+
assert obj.index(5) == 0
4745+
obj.sort()
4746+
obj.reverse()
4747+
for x in obj:
4748+
pass
4749+
assert len(obj) == 1
4750+
assert obj.pop(0) == 5
4751+
4752+
def test_list(self):
4753+
o = self.manager.list()
4754+
o.append(5)
4755+
self.run_worker(self._test_list, o)
4756+
assert not o
4757+
self.assertEqual(len(o), 0)
4758+
4759+
@classmethod
4760+
def _test_dict(cls, obj):
4761+
assert len(obj) == 1
4762+
assert obj['foo'] == 5
4763+
assert obj.get('foo') == 5
4764+
# TODO: fix https://bugs.python.org/issue35918
4765+
# assert obj.has_key('foo')
4766+
assert list(obj.items()) == [('foo', 5)]
4767+
assert list(obj.keys()) == ['foo']
4768+
assert list(obj.values()) == [5]
4769+
assert obj.copy() == {'foo': 5}
4770+
assert obj.popitem() == ('foo', 5)
4771+
4772+
def test_dict(self):
4773+
o = self.manager.dict()
4774+
o['foo'] = 5
4775+
self.run_worker(self._test_dict, o)
4776+
assert not o
4777+
self.assertEqual(len(o), 0)
4778+
4779+
@classmethod
4780+
def _test_value(cls, obj):
4781+
assert obj.value == 1
4782+
assert obj.get() == 1
4783+
obj.set(2)
4784+
4785+
def test_value(self):
4786+
o = self.manager.Value('i', 1)
4787+
self.run_worker(self._test_value, o)
4788+
self.assertEqual(o.value, 2)
4789+
self.assertEqual(o.get(), 2)
4790+
4791+
@classmethod
4792+
def _test_array(cls, obj):
4793+
assert obj[0] == 0
4794+
assert obj[1] == 1
4795+
assert len(obj) == 2
4796+
assert list(obj) == [0, 1]
4797+
4798+
def test_array(self):
4799+
o = self.manager.Array('i', [0, 1])
4800+
self.run_worker(self._test_array, o)
4801+
4802+
@classmethod
4803+
def _test_namespace(cls, obj):
4804+
assert obj.x == 0
4805+
assert obj.y == 1
4806+
4807+
def test_namespace(self):
4808+
o = self.manager.Namespace()
4809+
o.x = 0
4810+
o.y = 1
4811+
self.run_worker(self._test_namespace, o)
4812+
4813+
45784814
#
45794815
# Mixins
45804816
#
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
multiprocessing: provide unit tests for SyncManager and SharedMemoryManager
2+
classes + all the shareable types which are supposed to be supported by
3+
them. (patch by Giampaolo Rodola)

0 commit comments

Comments
 (0)