diff -r fb965ee44d5e Lib/concurrent/futures/_base.py --- a/Lib/concurrent/futures/_base.py Sat May 28 14:48:19 2016 +0300 +++ b/Lib/concurrent/futures/_base.py Sat May 28 15:19:14 2016 +0200 @@ -170,6 +170,18 @@ return waiter + +def _yield_future(fs, waiter, ref_collect=()): + while fs: + + with fs[0]._condition: + fs[0]._waiters.remove(waiter) + + for future_list in ref_collect: + future_list.remove(fs[0]) + yield fs.pop(0) + + def as_completed(fs, timeout=None): """An iterator over the given futures that yields each as it completes. @@ -191,6 +203,8 @@ if timeout is not None: end_time = timeout + time.time() + total_futures = len(fs) + fs = set(fs) with _AcquireFutures(fs): finished = set( @@ -198,9 +212,10 @@ if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) pending = fs - finished waiter = _create_and_install_waiters(fs, _AS_COMPLETED) - + finished = list(finished) try: - yield from finished + yield from _yield_future(finished, waiter, + ref_collect=(fs,)) while pending: if timeout is None: @@ -210,7 +225,7 @@ if wait_timeout < 0: raise TimeoutError( '%d (of %d) futures unfinished' % ( - len(pending), len(fs))) + len(pending), total_futures)) waiter.event.wait(wait_timeout) @@ -219,9 +234,8 @@ waiter.finished_futures = [] waiter.event.clear() - for future in finished: - yield future - pending.remove(future) + yield from _yield_future(finished, waiter, + ref_collect=(fs, pending)) finally: for f in fs: diff -r fb965ee44d5e Lib/test/test_concurrent_futures.py --- a/Lib/test/test_concurrent_futures.py Sat May 28 14:48:19 2016 +0300 +++ b/Lib/test/test_concurrent_futures.py Sat May 28 15:19:14 2016 +0200 @@ -358,6 +358,34 @@ completed = [f for f in futures.as_completed([future1,future1])] self.assertEqual(len(completed), 1) + def test_free_reference_yielded_future(self): + # Issue #14406: Generator should not keep reference + # for yielded futures. + futures_list = [Future() for _ in range(8)] + futures_list.append(create_future(state=CANCELLED_AND_NOTIFIED)) + futures_list.append(create_future(state=SUCCESSFUL_FUTURE)) + + with self.assertRaises(futures.TimeoutError): + for future in futures.as_completed(futures_list, timeout=0): + futures_list.remove(future) + self.assertEqual(sys.getrefcount(future), 2) + + futures_list[0].set_result("test") + for future in futures.as_completed(futures_list): + futures_list.remove(future) + self.assertEqual(sys.getrefcount(future), 2) + if futures_list: + futures_list[0].set_result("test") + + def test_correct_timeout_exception_msg(self): + futures_list = [CANCELLED_AND_NOTIFIED_FUTURE, PENDING_FUTURE, + RUNNING_FUTURE, SUCCESSFUL_FUTURE] + + with self.assertRaises(futures.TimeoutError) as cm: + list(futures.as_completed(futures_list, timeout=0)) + + self.assertEqual(str(cm.exception), '2 (of 4) futures unfinished') + class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests, unittest.TestCase): pass