此文是 Python 3 concurrent.futures (New in 3.2) 标准库版的糙译,第三方库 concurrent.futures 基本一样。
- Executor Objects
- ThreadPoolExecutor
- ProcessPoolExecutor
- Future Objects
- [Module Functions](# Module-Functions)
- Exception classes
concurrent.futures模块给异步调用提供了一个高级接口。
异步执行的可以是线程,通过使用ThreadPoolExecutor,也可以进程,通过使用ProcessPoolExecutor。都实现了由抽象类Executor所定义的一致接口。
一个提供了方法去执行异步调用的抽象类。这个类不应该被直接使用,而应该通过具体的子类来使用。
submit(fn, *args, **kwargs)
通过调度 callable、fn 以fn(*args **kwargs)的形式执行,并且返回一个Future对象来表示 callable 的执行。
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(pow, 323, 1235)
print(future.result())map(func, *iterables, timeout=None)
等价于map(func, *iterables),除了func是被异步执行并且这些函数调用可能同时发生。如果迭代器的__next__()被调用并且Executor.map()从开始调用的timeout秒之后还没有得到有效结果,将会引发TimeoutError。参数timeout可以是一个整数(int)或者浮点数(float),如果没有给定timeout或者给定的值为None,不会有等待时间的限制。如果一个调用引发了一个异常,异常会在它的值从迭代器重新取回时引发。
shutdown(wait=True)
给executor发送信号,告诉它应该释放当前已经完成执行的futures所使用的资源。在调用此方法之后再次调用Executor.submit()和Executor.map()将会引发RuntimeError异常。
如果wait为True,直到所有正在执行的futures完成执行,并且相关的资源被释放掉,这个方法才会返回。如果wait为False,这个方法将会立即返回,并且释放掉已经执行完成的futures所占用的资源。不管等待的耗费,在所有的futures完成执行之前整个Python程序都不会退出。
你应该避免调用这个方法如果你使用with语句的话,上下文管理器将会自动的关闭Executor(它会等待并调用wait设为True的Executor.shutdown())。
import shutil
with ThreadPoolExecutor(max_workers=4) as e:
e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
e.submit(shutil.copy, 'src3.txt', 'dest4.txt')ThreadPoolExecutor是Executor的一个子类,以线程池的方式实现异步调用。
当一个与Future相关的调用正在等待另一个Future的结果,可能会引发死锁。
import time
def wait_on_b():
time.sleep(5)
print(b.result()) # b will never complete because it is waiting on a.
return 5
def wait_on_a():
time.sleep(5)
print(a.result()) # a will never complete because it is waiting on b.
return 6
executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)这种情况也是:
def wait_on_future():
f = executor.submit(pow, 5, 2)
# This will never complete because there is only one worker thread and
# it is executing this function.
print(f.result())
executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)一个Executor的子类,使用线程数最多为max_workers的池来实现异步调用。
import concurrent.futures
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']
# Retrieve a single page and report the url and contents
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))ProcessPoolExecutor是Executor的子类,通过进程池来实现异步调用。ProcessPoolExecutor使用了multiprocessing 模块,这个模块避开了全局解释器锁(Global Interpreter Lock),但也意味着只有可pickle(序列化?)的对象能执行并返回。
对于__main__模块来说工作子进程必须是可导入的。也就是说ProcessPoolExecutor在交互解释器下是无法工作的。
从一个 callable 提交到ProcessPoolExecutor时调用Executor或者Future的方法会导致死锁。
Executor的子类,用进程数最多为max_workers的池来实现异步调用。如果没有给定max_workers或者为None,默认为机器处理器的数量。
Change in version3.3: 如果一个工作进程突然终止了,会引发一个
BrokenProcessPool异常。在以前的版本中,这样的行为是未定义的(不确定的),仅在 executor 中的操作,或者 future 会经常死掉,或者死锁。
import concurrent.futures
import math
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]
def is_prime(n):
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
if __name__ == '__main__':
main()Future类封装了一个 callable 的异步执行。Future实例通过Executor.submit()创建。
封装了一个 callable 的异步执行。Future实例被Executor.submit()创建,不应该被直接创建除了测试。
cancel()
尝试取消调用。如果调用是当前正在执行的并且不能被取消,这个方法返回False。否则这个调用会被取消并返回True。
cancelled()
返回True,如果调用被成功取消了。
running()
返回True,如果调用当前正在执行并且不能被取消。
done()
返回True,如果调用被成功取消了,或者已完成了运行。
result(timeout=None)
返回由调用返回的值。如果调用还未完成,这个方法将会等上timeout秒。如果调用在timeout秒之内都未完成,TimeoutError异常将会被抛出。timeout可以是整型(int)或浮点型(float),如果没有给定或者为None,等待时间是没有限制的。
exception(timeout=None)
返回一个由调用引发的异常。如果调用还未完成,这个方法将会等上timeout秒。如果调用在timeout秒之内都未完成,TimeoutError异常将会被抛出。timeout可以是整型(int)或浮点型(float),如果没有给定或者为None,等待时间是没有限制的。
如果 future 在完成之前被取消了,CancelledError 将会被抛出。
如果调用已完成且没有异常,返回None。
add_done_callback(fn)
将可调用的fn附在 future 上。以 future 作为唯一参数的fn将会被调用,当 future 被取消或者完成运行时。
被添加的 callable 按被添加的顺序调用,并且始终由属于添加它们的进程的线程调用。如果 callable 抛出的异常是Exception的子类,异常会被记录并忽略。如果 callable 抛出的异常是BaseException的子类,这种行为是未定义的(不确定的)。
如果 callable 已经完成或者被取消了,fn会立即执行。
下面的Future方法只应该用在单元测试或者Executor的实现中:
set_running_or_notify_cancel()
这个方法仅应该被Executor的实现调用,在单元测试之前执行与Future相关的工作。
如果方法返回False那么Future已经被取消了,例如,Future.cancel()被调用并且返回True。任何正在等待Future完成(例如,传给as_completed()或wait()的 future)的线程都会被唤醒。
如果返回True那么Future未被取消并被传到运行状态,例如,调用Future.running()会返回True。
这个方法只能被调用一次,并且不能在Future.set_result()和Future.set_exception()调用之后调用。
set_result(result)
给与Future相关的工作设置一个返回结果。
这个方法应该仅仅用于Executor的实现或者单元测试中。
set_exception(exception)
给与Future相关的工作的结果设置一个Exception异常。
这个方法应该仅仅用于Executor的实现或者单元测试中。
concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)
等待由fs传入的所有Future实例(可能由不同的Executor创建)完成执行。返回一个已命名集合组成的二元组,第一个集合,叫done,包含等待完成之前(函数完成前??)所有已完成(完成或被取消)的 futures,第二个集合叫not_done,包含未完成的 futures。
timeout用来控制返回之前的最大等待时间(秒)。timeout可以是 int 或 float,如果没有给定或者为None,等待时间无限制。
return_when 表示什么时候函数应该返回。其值应该是一下常量之一:
| Constant | Description |
|---|---|
| FIRST_COMPLETED | 只要有任何一个 future 完成或取消,函数就会返回 |
| FIRST_EXCEPTION | 只要有任何一个 future 完成的时候发生异常,函数就会返回。如果没有一个 future 引发异常,等价于 ALL_COMPLETED |
| ALL_COMPLETED | 所有的 future 都完成或者被取消,函数才会返回 |
concurrent.futures.as_completed(fs, timeout=None)
返回一个迭代器,并依次返回作为fs传入的已完成(完成或被取消)的Future实例(可能由不同的Executor创建)。fs中任何重复的都只会返回一次。任何在调用as_completed()函数之前完成的 futures,将会被迭代器立即返回。如果迭代器的__next__()被调用了,并且在as_completed()被调用的timeout秒之后还未产生有效的值,将会引发TimeoutError。timeout可以是 int 或 float,如果没有给定或者为None,等待时间无限制。
See also: PEP 3148 – futures - execute computations asynchronously The proposal which described this feature for inclusion in the Python standard library.
exception concurrent.futures.process.BrokenProcessPool
从RunTimeError派生,当ProcessPoolExecutor 的一个工作进程在 non-clean fasion (例如,从外部被杀死)中被终止。