17.4. concurrent.futures
— 启动并行任务¶
3.2 新版功能.
源码: Lib/concurrent/futures/thread.py 和 Lib/concurrent/futures/process.py
concurrent.futures
模块提供异步执行可调用对象高层接口。
异步执行可以由 ThreadPoolExecutor
使用线程或由 ProcessPoolExecutor
使用单独的进程来实现。 两者都是实现抽像类 Executor
定义的接口。
17.4.1. Executor 对象¶
-
class
concurrent.futures.
Executor
¶ 抽象类提供异步执行调用方法。要通过它的子类调用,而不是直接调用。
-
submit
(fn, *args, **kwargs)¶ 调度可调用对象 fn,以
fn(*args **kwargs)
方式执行并返回Future
对像代表可调用对象的执行。:with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(pow, 323, 1235) print(future.result())
-
map
(func, *iterables, timeout=None, chunksize=1)¶ 类似于
map(func, *iterables)
函数,除了以下两点:iterables 是立即执行而不是延迟执行的;
func 是异步执行的,对 func 的多个调用可以并发执行。
如果从原始调用到
Executor.map()
经过 timeout 秒后,__next__()
已被调用且返回的结果还不可用,那么已返回的迭代器将触发concurrent.futures.TimeoutError
。 timeout 可以是整数或浮点数。如果 timeout 没有指定或为None
,则没有超时限制。如果 func 调用引发一个异常,当从迭代器中取回它的值时这个异常将被引发。
使用
ProcessPoolExecutor
时,这个方法会将 iterables 分割任务块并作为独立的任务并提交到执行池中。这些块的大概数量可以由 chunksize 指定正整数设置。 对很长的迭代器来说,使用大的 chunksize 值比默认值 1 能显著地提高性能。 chunksize 对ThreadPoolExecutor
没有效果。在 3.5 版更改: 加入 chunksize 参数。
-
shutdown
(wait=True)¶ 当待执行的 future 对象完成执行后向执行者发送信号,它就会释放正在使用的任何资源。 在关闭后调用
Executor.submit()
和Executor.map()
将会引发RuntimeError
。如果 wait 为
True
则此方法只有在所有待执行的 future 对象完成执行且释放已分配的资源后才会返回。 如果 wait 为False
,方法立即返回,所有待执行的 future 对象完成执行后会释放已分配的资源。 不管 wait 的值是什么,整个 Python 程序将等到所有待执行的 future 对象完成执行后才退出。如果使用
with
语句,你就可以避免显式调用这个方法,它将会停止Executor
(就好像Executor.shutdown()
调用时 wait 设为True
一样等待):import shutil with ThreadPoolExecutor(max_workers=4) as e: e.submit(shutil.copy, 'src1.txt', 'dest1.txt') e.submit(shutil.copy, 'src2.txt', 'dest2.txt') e.submit(shutil.copy, 'src3.txt', 'dest3.txt') e.submit(shutil.copy, 'src4.txt', 'dest4.txt')
-
17.4.2. ThreadPoolExecutor¶
ThreadPoolExecutor
是 Executor
的子类,它使用线程池来异步执行调用。
当回调已关联了一个 Future
然后再等待另一个 Future
的结果时就会发产死锁情况。例如:
import time
def wait_on_b():
time.sleep(5)
print(b.result()) # b will never complete because it is waiting on a.
return 5
def wait_on_a():
time.sleep(5)
print(a.result()) # a will never complete because it is waiting on b.
return 6
executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)
与:
def wait_on_future():
f = executor.submit(pow, 5, 2)
# This will never complete because there is only one worker thread and
# it is executing this function.
print(f.result())
executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)
-
class
concurrent.futures.
ThreadPoolExecutor
(max_workers=None, thread_name_prefix='')¶ Executor
子类使用最多 max_workers 个线程的线程池来异步执行调用。在 3.5 版更改: 如果 max_workers 为
None
或没有指定,将默认为机器处理器的个数,假如ThreadPoolExecutor
侧重于 I/O 操作而不是 CPU 运算,那么可以乘以5
,同时工作线程的数量可以比ProcessPoolExecutor
的数量高。3.6 新版功能: 添加 thread_name_prefix 参数允许用户控制由线程池创建的
threading.Thread
工作线程名称以方便调试。
17.4.2.1. ThreadPoolExecutor 例子¶
import concurrent.futures
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
17.4.3. ProcessPoolExecutor¶
ProcessPoolExecutor
是 Executor
的子类,它使用进程池来实现异步执行调用。 ProcessPoolExecutor
使用 multiprocessing
回避 Global Interpreter Lock 但也意味着只可以处理和返回可序列化的对象。
__main__
模块必须可以被工作者子进程导入。这意味着 ProcessPoolExecutor
不可以工作在交互式解释器中。
从可调用对象中调用 Executor
或 Future
的方法提交给 ProcessPoolExecutor
会导致死锁。
-
class
concurrent.futures.
ProcessPoolExecutor
(max_workers=None)¶ An
Executor
subclass that executes calls asynchronously using a pool of at most max_workers processes. If max_workers isNone
or not given, it will default to the number of processors on the machine. If max_workers is lower or equal to0
, then aValueError
will be raised.在 3.3 版更改: 如果其中一个工作进程被突然终止,
BrokenProcessPool
就会马上触发。 可预计的行为没有定义,但执行器上的操作或它的 future 对象会被冻结或死锁。
17.4.3.1. ProcessPoolExecutor 例子¶
import concurrent.futures
import math
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]
def is_prime(n):
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
if __name__ == '__main__':
main()
17.4.4. Future 对象¶
Future
类将可调用对象封装为异步执行。Future
实例由 Executor.submit()
创建。
-
class
concurrent.futures.
Future
¶ 将可调用对象封装为异步执行。
Future
实例由Executor.submit()
创建,除非测试,不应直接创建。-
cancel
()¶ Attempt to cancel the call. If the call is currently being executed and cannot be cancelled then the method will return
False
, otherwise the call will be cancelled and the method will returnTrue
.
-
cancelled
()¶ 如果调用成功取消返回
True
。
-
running
()¶ 如果调用正在执行而且不能被取消那么返回
True
。
-
done
()¶ 如果调用已被取消或正常结束那么返回
True
。
-
result
(timeout=None)¶ 返回调用返回的值。如果调用还没完成那么这个方法将等待 timeout 秒。如果在 timeout 秒内没有执行完成,
concurrent.futures.TimeoutError
将会被触发。timeout 可以是整数或浮点数。如果 timeout 没有指定或为None
,那么等待时间就没有限制。如果 futrue 在完成前被取消则
CancelledError
将被触发。如果调用引发了一个异常,这个方法也会引发同样的异常。
-
exception
(timeout=None)¶ 返回由调用引发的异常。如果调用还没完成那么这个方法将等待 timeout 秒。如果在 timeout 秒内没有执行完成,
concurrent.futures.TimeoutError
将会被触发。timeout 可以是整数或浮点数。如果 timeout 没有指定或为None
,那么等待时间就没有限制。如果 futrue 在完成前被取消则
CancelledError
将被触发。如果调用正常完成那么返回
None
。
-
add_done_callback
(fn)¶ 附加可调用 fn 到 future 对象。当 future 对象被取消或完成运行时,将会调用 fn,而这个 future 对象将作为它唯一的参数。
加入的可调用对象总被属于添加它们的进程中的线程按加入的顺序调用。如果可调用对象引发一个
Exception
子类,它会被记录下来并被忽略掉。如果可调用对象引发一个BaseException
子类,这个行为没有定义。如果 future 对象已经完成或已取消,fn 会被立即调用。
下面这些
Future
方法用于单元测试和Executor
实现。-
set_running_or_notify_cancel
()¶ 这个方法只可以在执行关联
Future
工作之前由Executor
实现调用或由单测试调用。如果这个方法返回
False
那么Future
已被取消,即Future.cancel()
已被调用并返回True
。等待Future
完成 (即通过as_completed()
或wait()
) 的线程将被唤醒。如果这个方法返回
True
那么Future
不会被取消并已将它变为正在运行状态,也就是说调用Future.running()
时将返回 True。这个方法只可以被调用一次并且不能在调用
Future.set_result()
或Future.set_exception()
之后再调用。
-
17.4.5. 模块函数¶
-
concurrent.futures.
wait
(fs, timeout=None, return_when=ALL_COMPLETED)¶ Wait for the
Future
instances (possibly created by differentExecutor
instances) given by fs to complete. Returns a named 2-tuple of sets. The first set, nameddone
, contains the futures that completed (finished or were cancelled) before the wait completed. The second set, namednot_done
, contains uncompleted futures.timeout 可以用来控制返回前最大的等待秒数。 timeout 可以为 int 或 float 类型。 如果 timeout 未指定或为
None
,则不限制等待时间。return_when 指定此函数应在何时返回。它必须为以下常数之一:
常数
描述
FIRST_COMPLETED
函数将在任意可等待对象结束或取消时返回。
FIRST_EXCEPTION
函数将在任意可等待对象因引发异常而结束时返回。当没有引发任何异常时它就相当于
ALL_COMPLETED
。ALL_COMPLETED
函数将在所有可等待对象结束或取消时返回。
-
concurrent.futures.
as_completed
(fs, timeout=None)¶ Returns an iterator over the
Future
instances (possibly created by differentExecutor
instances) given by fs that yields futures as they complete (finished or were cancelled). Any futures given by fs that are duplicated will be returned once. Any futures that completed beforeas_completed()
is called will be yielded first. The returned iterator raises aconcurrent.futures.TimeoutError
if__next__()
is called and the result isn’t available after timeout seconds from the original call toas_completed()
. timeout can be an int or float. If timeout is not specified orNone
, there is no limit to the wait time.
参见
- PEP 3148 – future 对象 - 异步执行指令。
该提案描述了Python标准库中包含的这个特性。
17.4.6. Exception类¶
-
exception
concurrent.futures.
CancelledError
¶ future 对象被取消时会触发。
-
exception
concurrent.futures.
TimeoutError
¶ future 对象执行超出给定的超时数值时引发。
-
exception
concurrent.futures.process.
BrokenProcessPool
¶ Derived from
RuntimeError
, this exception class is raised when one of the workers of aProcessPoolExecutor
has terminated in a non-clean fashion (for example, if it was killed from the outside).3.3 新版功能.