concurrent.futures
-- 啟動平行任務¶
在 3.2 版新加入.
原始碼:Lib/concurrent/futures/thread.py 與 Lib/concurrent/futures/process.py
concurrent.futures
模組提供了一個高階介面來非同步地 (asynchronously) 執行可呼叫物件 (callable) 。
非同步執行可以透過 ThreadPoolExecutor
來使用執行緒 (thread) 執行,或透過 ProcessPoolExecutor
來使用單獨行程 (process) 執行。兩者都實作了相同的介面,該介面由抽象的 Executor
類別定義。
適用:非 Emscripten、非 WASI。
此模組在 WebAssembly 平台 wasm32-emscripten
和 wasm32-wasi
上沒有作用或不可使用。更多資訊,請參閱 WebAssembly 平台。
Executor 物件¶
- class concurrent.futures.Executor¶
提供非同步執行呼叫方法的抽象類別。不應直接使用它,而應透過其具體子類別來使用。
- submit(fn, /, *args, **kwargs)¶
為可呼叫物件 fn 排程來以
fn(*args, **kwargs)
的形式執行並回傳一個表示可呼叫的執行的Future
物件。with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(pow, 323, 1235) print(future.result())
- map(fn, *iterables, timeout=None, chunksize=1)¶
类似于
map(fn, *iterables)
但有以下差异:iterables 立即被收集而不是延遲 (lazily) 收集;
fn 是异步执行的并且可以并发对 fn 的多个调用。
如果
__next__()
被呼叫,且在原先呼叫Executor.map()
的 timeout 秒後結果仍不可用,回傳的疊代器就會引發TimeoutError
。timeout 可以是整數或浮點數。如果未指定 timeout 或為None
,則等待時間就不會有限制。如果 fn 调用引发了异常,那么当从迭代器获取其值时该异常将被引发。
使用
ProcessPoolExecutor
時,此方法將 iterables 分成許多分塊 (chunks),並將其作為獨立的任務來提交給池 (pool)。可以透過將 chunksize 設定為正整數來指定這些分塊的(約略)大小。對於非常長的可疊代物件,chunksize 使用較大的值(與預設大小 1 相比)可以顯著提高性能。對於ThreadPoolExecutor
,chunksize 無效。在 3.5 版的變更: 新增 chunksize 引數。
- shutdown(wait=True, *, cancel_futures=False)¶
向 executor 發出訊號 (signal),表明它應該在當前未定 (pending) 的 future 完成執行時釋放它正在使用的任何資源。在關閉後呼叫
Executor.submit()
和Executor.map()
將引發RuntimeError
。如果 wait 為
True
則此方法將不會回傳,直到所有未定的 futures 完成執行並且與 executor 關聯的資源都被釋放。如果 wait 為False
則此方法將立即回傳,並且當所有未定的 future 執行完畢時,與 executor 關聯的資源將被釋放。不管 wait 的值如何,整個 Python 程式都不會退出,直到所有未定的 futures 執行完畢。如果 cancel_futures 為
True
,此方法將取消 executor 尚未開始運行的所有未定 future。無論 cancel_futures 的值如何,任何已完成或正在運行的 future 都不會被取消。如果 cancel_futures 和 wait 都為
True
,則 executor 已開始運行的所有 future 將在此方法回傳之前完成。剩餘的 future 被取消。如果使用
with
陳述句,你就可以不用明確地呼叫此方法,這將會自己關閉Executor
(如同呼叫Executor.shutdown()
時 wait 被設定為True
般等待):import shutil with ThreadPoolExecutor(max_workers=4) as e: e.submit(shutil.copy, 'src1.txt', 'dest1.txt') e.submit(shutil.copy, 'src2.txt', 'dest2.txt') e.submit(shutil.copy, 'src3.txt', 'dest3.txt') e.submit(shutil.copy, 'src4.txt', 'dest4.txt')
在 3.9 版的變更: 新增 cancel_futures。
ThreadPoolExecutor¶
ThreadPoolExecutor
是一個 Executor
子類別,它使用執行緒池來非同步地執行呼叫。
當與 Future
關聯的可呼叫物件等待另一個 Future
的結果時,可能會發生死鎖 (deadlock)。例如:
import time
def wait_on_b():
time.sleep(5)
print(b.result()) # b will never complete because it is waiting on a.
return 5
def wait_on_a():
time.sleep(5)
print(a.result()) # a will never complete because it is waiting on b.
return 6
executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)
和:
def wait_on_future():
f = executor.submit(pow, 5, 2)
# This will never complete because there is only one worker thread and
# it is executing this function.
print(f.result())
executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)
- class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())¶
一個
Executor
子類別,它使用最多有 max_workers 個執行緒的池來非同步地執行呼叫。所有排隊到
ThreadPoolExecutor
的執行緒都將在直譯器退出之前加入。請注意,執行此操作的退出處理程式會在任何使用atexit
新增的退出處理程式之前執行。這意味著必須捕獲並處理主執行緒中的例外,以便向執行緒發出訊號來正常退出 (gracefully exit)。因此,建議不要將ThreadPoolExecutor
用於長時間運行的任務。initializer 是一個可選的可呼叫物件,在每個工作執行緒開始時呼叫; initargs 是傳遞給 initializer 的引數元組 (tuple)。如果 initializer 引發例外,所有當前未定的作業以及任何向池中提交 (submit) 更多作業的嘗試都將引發
BrokenThreadPool
。在 3.5 版的變更: 如果 max_workers 為
None
或未給定,它將預設為機器上的處理器數量乘以5
,這假定了ThreadPoolExecutor
通常用於 I/O 重疊而非 CPU 密集的作業,並且 worker 的數量應該高於ProcessPoolExecutor
的 worker 數量。在 3.6 版的變更: 增加了 thread_name_prefix 形参来允许用户控制由线程池创建的
threading.Thread
工作线程名称以方便调试。在 3.7 版的變更: 新增 initializer 與 initargs 引數。
在 3.8 版的變更: max_workers 的預設值改為
min(32, os.cpu_count() + 4)
。此預設值為 I/O 密集任務至少保留了 5 個 worker。它最多使用 32 個 CPU 核心來執行CPU 密集任務,以釋放 GIL。並且它避免了在多核機器上隱晦地使用非常大量的資源。ThreadPoolExecutor 現在在啟動 max_workers 工作執行緒之前會重用 (reuse) 空閒的工作執行緒。
ThreadPoolExecutor 範例¶
import concurrent.futures
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://nonexistant-subdomain.python.org/']
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
ProcessPoolExecutor¶
ProcessPoolExecutor
類別是一個 Executor
的子類別,它使用行程池來非同步地執行呼叫。ProcessPoolExecutor
使用了 multiprocessing
模組,這允許它避開全域直譯器鎖 (Global Interpreter Lock),但也意味著只能執行和回傳可被 pickle 的 (picklable) 物件。
__main__
模組必須可以被工作子行程 (worker subprocess) 引入。這意味著 ProcessPoolExecutor
將無法在交互式直譯器 (interactive interpreter) 中工作。
從提交給 ProcessPoolExecutor
的可呼叫物件中呼叫 Executor
或 Future
方法將導致死鎖。
- class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=(), max_tasks_per_child=None)¶
一個
Executor
子類別,它使用了最多有 max_workers 個行程的池來非同步地執行呼叫。如果 max_workers 為None
或未給定,它將被預設為機器上的處理器數量。如果 max_workers 小於或等於0
,則會引發ValueError
。在 Windows 上,max_workers 必須小於或等於61
。如果不是,則會引發ValueError
。如果 max_workers 為None
,則預設選擇最多為61
,即便有更多處理器可用。mp_context 可以是 multiprocessing 情境 (context) 或 None。它將用於啟動 worker。如果 mp_context 為None
或未給定,則使用預設的 multiprocessing 情境。initializer 是一個可選的可呼叫物件,在每個工作行程 (worker process) 開始時呼叫;initargs 是傳遞給 initializer 的引數元組。如果 initializer 引發例外,所有當前未定的作業以及任何向池中提交更多作業的嘗試都將引發
BrokenProcessPool
。max_tasks_per_child 是一個可選引數,它指定單個行程在退出並被新的工作行程替換之前可以執行的最大任務數。預設情況下 max_tasks_per_child 是
None
,這意味著工作行程的生命週期將與池一樣長。當指定最大值時,在沒有 mp_context 參數的情況下,將預設使用 "spawn" 做為 multiprocessing 啟動方法。此功能與 "fork" 啟動方法不相容。在 3.3 版的變更: 当某个工作进程突然终止时,现在将引发
BrokenProcessPool
。 在之前版本中,它的行为是未定义的但在执行器上的操作或它的 future 对象往往会被冻结或死锁。在 3.7 版的變更: 新增了 mp_context 引數以允許使用者控制由池所建立的工作行程的 start_method。
新增 initializer 與 initargs 引數。
在 3.11 版的變更: 新增了 max_tasks_per_child 引數以允許使用者控制池中 worker 的生命週期。
ProcessPoolExecutor 範例¶
import concurrent.futures
import math
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]
def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
if __name__ == '__main__':
main()
Future 物件¶
Future
類別封裝了可呼叫物件的非同步執行。Future
實例由 Executor.submit()
建立。
- class concurrent.futures.Future¶
封裝可呼叫物件的非同步執行。
Future
實例由Executor.submit()
建立,且除測試外不應直接建立。- cancel()¶
嘗試取消呼叫。如果呼叫當前正在執行或已完成運行且無法取消,則該方法將回傳
False
,否則呼叫將被取消並且該方法將回傳True
。
- cancelled()¶
如果該呼叫成功被取消,則回傳
True
。
- running()¶
如果呼叫正在執行且無法取消,則回傳
True
。
- done()¶
如果呼叫成功被取消或結束運行,則回傳
True
。
- result(timeout=None)¶
回傳該呼叫回傳的值。如果呼叫尚未完成,則此方法將等待至多 timeout 秒。如果呼叫在 timeout 秒內未完成,則會引發
TimeoutError
。timeout 可以是整數或浮點數。如果未指定 timeout 或為None
,則等待時間就不會有限制。如果 future 在完成之前被取消,那麼
CancelledError
將被引發。如果該呼叫引發了例外,此方法將引發相同的例外。
- exception(timeout=None)¶
回傳該呼叫引發的例外。如果呼叫尚未完成,則此方法將等待至多 timeout 秒。如果呼叫在 timeout 秒內未完成,則會引發
TimeoutError
。 timeout 可以是整數或浮點數。如果未指定 timeout 或為None
,則等待時間就不會有限制。如果 future 在完成之前被取消,那麼
CancelledError
將被引發。如果呼叫在沒有引發的情況下完成,則回傳
None
。
- add_done_callback(fn)¶
將可呼叫的 fn 附加到 future 上。當 future 被取消或完成運行時,fn 將被以 future 作為其唯一引數來呼叫。
新增的可呼叫物件按新增順序呼叫,並且始終在屬於新增它們的行程的執行緒中呼叫。如果可呼叫物件引發
Exception
子類別,它將被記錄 (log) 並忽略。如果可呼叫物件引發BaseException
子類別,該行為未定義。如果 future 已經完成或被取消,fn 將立即被呼叫。
以下
Future
方法旨在用於單元測試和Executor
實作。- set_running_or_notify_cancel()¶
此方法只能在與
Future
關聯的工作被執行之前於Executor
實作中呼叫,或者在單元測試中呼叫。如果該方法回傳
False
則Future
已被取消,即Future.cancel()
被呼叫並回傳True
。任何等待Future
完成的執行緒(即透過as_completed()
或wait()
)將被喚醒。如果該方法回傳
True
則代表Future
未被取消並已進入運行狀態,意即呼叫Future.running()
將回傳True
。此方法只能呼叫一次,且不能在呼叫
Future.set_result()
或Future.set_exception()
之後呼叫。
- set_result(result)¶
將與
Future
關聯的工作結果設定為 result。此方法只能在
Executor
實作中和單元測試中使用。在 3.8 版的變更: 如果
Future
已經完成,此方法會引發concurrent.futures.InvalidStateError
。
模組函式¶
- concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)¶
等待 fs 給定的
Future
實例(可能由不同的Executor
實例建立)完成。提供給 fs 的重複 future 將被刪除,並且只會回傳一次。回傳一個集合的附名二元組 (named 2-tuple of sets)。第一組名為done
,包含在等待完成之前完成的 future(已完成或被取消的 future)。第二組名為not_done
,包含未完成的 future(未定或運行中的 future)。timeout 可用於控制回傳前等待的最大秒數。timeout 可以是整數或浮點數。如果未指定 timeout 或為
None
,則等待時間就沒有限制。return_when 表示此函式應回傳的時間。它必須是以下常數之一:
常數
描述
- concurrent.futures.FIRST_COMPLETED¶
當任何 future 完成或被取消時,該函式就會回傳。
- concurrent.futures.FIRST_EXCEPTION¶
该函数将在任何 future 对象通过引发异常而结束时返回。 如果没有任何 future 对象引发引发那么它将等价于
ALL_COMPLETED
。- concurrent.futures.ALL_COMPLETED¶
當所有 future 都完成或被取消時,該函式才會回傳。
- concurrent.futures.as_completed(fs, timeout=None)¶
回傳由 fs 給定的
Future
實例(可能由不同的Executor
實例建立)的疊代器,它在完成時產生 future(已完成或被取消的 future)。fs 給定的任何重複的 future 將只被回傳一次。呼叫as_completed()
之前完成的任何 future 將首先產生。如果__next__()
被呼叫,並且在原先呼叫as_completed()
的 timeout 秒後結果仍不可用,則回傳的疊代器會引發TimeoutError
。timeout 可以是整數或浮點數。如果未指定 timeout 或為None
,則等待時間就沒有限制。
也參考
- PEP 3148 -- futures - 非同步地執行運算
描述此功能並提出被包含於 Python 標準函式庫中的提案。
例外類別¶
- exception concurrent.futures.CancelledError¶
當 future 被取消時引發。
- exception concurrent.futures.TimeoutError¶
TimeoutError
的棄用別名,在 future 操作超過給定超時 (timeout) 時引發。在 3.11 版的變更: 這個類別是
TimeoutError
的別名。
- exception concurrent.futures.BrokenExecutor¶
衍生自
RuntimeError
,當執行器因某種原因損壞時會引發此例外類別,並且不能用於提交或執行新任務。在 3.7 版新加入.
- exception concurrent.futures.InvalidStateError¶
當前狀態下不允許的 future 操作被執行時而引發。
在 3.8 版新加入.
- exception concurrent.futures.thread.BrokenThreadPool¶
派生自
BrokenExecutor
,这个异常类会在ThreadPoolExecutor
的某个工作线程初始化失败时被引发。在 3.7 版新加入.
- exception concurrent.futures.process.BrokenProcessPool¶
派生自
BrokenExecutor
(原为RuntimeError
),这个异常类会在ProcessPoolExecutor
的某个工作进程以不完整的方式终结(例如,从外部杀掉)时被引发。在 3.3 版新加入.