concurrent.futures --- 啟動平行任務¶
在 3.2 版被加入.
原始碼:Lib/concurrent/futures/thread.py 與 Lib/concurrent/futures/process.py
concurrent.futures 模組提供了一個高階介面來非同步地 (asynchronously) 執行可呼叫物件 (callable) 。
非同步執行可以透過 ThreadPoolExecutor 來使用執行緒 (thread) 執行,或透過 ProcessPoolExecutor 來使用單獨行程 (process) 執行。兩者都實作了相同的介面,該介面由抽象的 Executor 類別定義。
可用性: not WASI.
此模組在 WebAssembly 平台上不起作用或無法使用。更多資訊請參閱 WebAssembly 平台。
Executor 物件¶
- class concurrent.futures.Executor¶
提供非同步執行呼叫方法的抽象類別。不應直接使用它,而應透過其具體子類別來使用。
- submit(fn, /, *args, **kwargs)¶
為可呼叫物件 fn 排程來以
fn(*args, **kwargs)的形式執行並回傳一個表示可呼叫的執行的Future物件。with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(pow, 323, 1235) print(future.result())
- map(fn, *iterables, timeout=None, chunksize=1)¶
類似於
map(fn, *iterables),除了:iterables 立即被收集而不是延遲 (lazily) 收集;
fn 是非同步執行的,並且對 fn 的多次呼叫可以並行處理。
如果
__next__()被呼叫,且在原先呼叫Executor.map()的 timeout 秒後結果仍不可用,回傳的疊代器就會引發TimeoutError。timeout 可以是整數或浮點數。如果未指定 timeout 或為None,則等待時間就不會有限制。如果 fn 呼叫引發例外,則當從疊代器中檢索到它的值時將引發該例外。
使用
ProcessPoolExecutor時,此方法將 iterables 分成許多分塊 (chunks),並將其作為獨立的任務來提交給池 (pool)。可以透過將 chunksize 設定為正整數來指定這些分塊的(約略)大小。對於非常長的可疊代物件,chunksize 使用較大的值(與預設大小 1 相比)可以顯著提高性能。對於ThreadPoolExecutor,chunksize 無效。在 3.5 版的變更: 新增 chunksize 引數。
- shutdown(wait=True, *, cancel_futures=False)¶
向 executor 發出訊號 (signal),表明它應該在目前未定 (pending) 的 future 完成執行時釋放它正在使用的任何資源。在關閉後呼叫
Executor.submit()和Executor.map()將引發RuntimeError。如果 wait 為
True則此方法將不會回傳,直到所有未定的 futures 完成執行並且與 executor 關聯的資源都被釋放。如果 wait 為False則此方法將立即回傳,並且當所有未定的 future 執行完畢時,與 executor 關聯的資源將被釋放。不管 wait 的值如何,整個 Python 程式都不會退出,直到所有未定的 futures 執行完畢。如果 cancel_futures 為
True,此方法將取消 executor 尚未開始運行的所有未定 future。無論 cancel_futures 的值如何,任何已完成或正在運行的 future 都不會被取消。如果 cancel_futures 和 wait 都為
True,則 executor 已開始運行的所有 future 將在此方法回傳之前完成。剩餘的 future 被取消。如果透過
with陳述式來將 executor 作為情境管理器使用,你就可以不用明確地呼叫此方法,這將會自己關閉Executor(如同呼叫Executor.shutdown()時 wait 被設定為True般等待):import shutil with ThreadPoolExecutor(max_workers=4) as e: e.submit(shutil.copy, 'src1.txt', 'dest1.txt') e.submit(shutil.copy, 'src2.txt', 'dest2.txt') e.submit(shutil.copy, 'src3.txt', 'dest3.txt') e.submit(shutil.copy, 'src4.txt', 'dest4.txt')
在 3.9 版的變更: 新增 cancel_futures。
ThreadPoolExecutor¶
ThreadPoolExecutor 是一個 Executor 子類別,它使用執行緒池來非同步地執行呼叫。
當與 Future 關聯的可呼叫物件等待另一個 Future 的結果時,可能會發生死鎖 (deadlock)。例如:
import time
def wait_on_b():
time.sleep(5)
print(b.result()) # b 永遠不會完成,因為它正在等待 a。
return 5
def wait_on_a():
time.sleep(5)
print(a.result()) # a 永遠不會完成,因為它正在等待 b。
return 6
executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)
和:
def wait_on_future():
f = executor.submit(pow, 5, 2)
# 這將永遠不會完成,因為只有一個工作執行緒且
# 它正在執行這個函式。
print(f.result())
executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)
- class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())¶
一個
Executor子類別,它使用最多有 max_workers 個執行緒的池來非同步地執行呼叫。所有排隊到
ThreadPoolExecutor的執行緒都將在直譯器退出之前加入。請注意,執行此操作的退出處理程式會在任何使用atexit新增的退出處理程式之前執行。這意味著必須捕獲並處理主執行緒中的例外,以便向執行緒發出訊號來正常退出 (gracefully exit)。因此,建議不要將ThreadPoolExecutor用於長時間運行的任務。initializer 是一個可選的可呼叫物件,在每個工作執行緒開始時呼叫; initargs 是傳遞給 initializer 的引數元組 (tuple)。如果 initializer 引發例外,所有目前未定的作業以及任何向池中提交 (submit) 更多作業的嘗試都將引發
BrokenThreadPool。在 3.5 版的變更: 如果 max_workers 為
None或未給定,它將預設為機器上的處理器數量乘以5,這假定了ThreadPoolExecutor通常用於 I/O 重疊而非 CPU 密集的作業,並且 worker 的數量應該高於ProcessPoolExecutor的 worker 數量。在 3.6 版的變更: 新增 thread_name_prefix 參數以允許使用者控制由池所建立的工作執行緒 (worker thread) 的
threading.Thread名稱,以便於除錯。在 3.7 版的變更: 新增 initializer 與 initargs 引數。
在 3.8 版的變更: max_workers 的預設值改為
min(32, os.cpu_count() + 4)。此預設值為 I/O 密集任務至少保留了 5 個 worker。它最多使用 32 個 CPU 核心來執行CPU 密集任務,以釋放 GIL。並且它避免了在多核機器上隱晦地使用非常大量的資源。ThreadPoolExecutor 現在在啟動 max_workers 工作執行緒之前會重用 (reuse) 空閒的工作執行緒。
在 3.13 版的變更: max_workers 的預設值被改為
min(32, (os.process_cpu_count() or 1) + 4)。
ThreadPoolExecutor 範例¶
import concurrent.futures
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://nonexistent-subdomain.python.org/']
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
ProcessPoolExecutor¶
ProcessPoolExecutor 類別是一個 Executor 的子類別,它使用行程池來非同步地執行呼叫。ProcessPoolExecutor 使用了 multiprocessing 模組,這允許它避開全域直譯器鎖 (Global Interpreter Lock),但也意味著只能執行和回傳可被 pickle 的 (picklable) 物件。
__main__ 模組必須可以被工作子行程 (worker subprocess) 引入。這意味著 ProcessPoolExecutor 將無法在交互式直譯器 (interactive interpreter) 中工作。
從提交給 ProcessPoolExecutor 的可呼叫物件中呼叫 Executor 或 Future 方法將導致死鎖。
Note that the restrictions on functions and arguments needing to picklable as
per multiprocessing.Process apply when using submit()
and map() on a ProcessPoolExecutor. A function defined
in a REPL or a lambda should not be expected to work.
- class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=(), max_tasks_per_child=None)¶
一個
Executor子類別,它使用了最多有 max_workers 個行程的池來非同步地執行呼叫。如果 max_workers 為None或未給定,它將被預設為os.process_cpu_count()。如果 max_workers 小於或等於0,則會引發ValueError。在 Windows 上,max_workers 必須小於或等於61。如果不是,則會引發ValueError。如果 max_workers 為None,則預設選擇最多為61,即便有更多處理器可用。mp_context 可以是multiprocessing情境 (context) 或None。它將用於啟動 worker。如果 mp_context 為None或未給定,則使用預設的multiprocessing情境。請見 Contexts and start methods。initializer 是一個可選的可呼叫物件,在每個工作行程 (worker process) 開始時呼叫;initargs 是傳遞給 initializer 的引數元組。如果 initializer 引發例外,所有目前未定的作業以及任何向池中提交更多作業的嘗試都將引發
BrokenProcessPool。max_tasks_per_child 是一個可選引數,它指定單個行程在退出並被新的工作行程替換之前可以執行的最大任務數。預設情況下 max_tasks_per_child 是
None,這意味著工作行程的生命週期將與池一樣長。當指定最大值時,在沒有 mp_context 參數的情況下,將預設使用 "spawn" 做為 multiprocessing 啟動方法。此功能與 "fork" 啟動方法不相容。在 3.3 版的變更: 當其中一個工作行程突然終止時,現在會引發
BrokenProcessPool錯誤。在過去,此行為是未定義的 (undefined),但對 executor 或其 future 的操作經常會發生凍結或死鎖。在 3.7 版的變更: 新增了 mp_context 引數以允許使用者控制由池所建立的工作行程的 start_method。
新增 initializer 與 initargs 引數。
備註
預設的
multiprocessing啟動方法(請參閱 Contexts and start methods)將不再是 Python 3.14 中的 fork。需要 fork 用於其ProcessPoolExecutor的程式碼應透過傳遞mp_context=multiprocessing.get_context("fork")參數來明確指定。在 3.11 版的變更: 新增了 max_tasks_per_child 引數以允許使用者控制池中 worker 的生命週期。
在 3.12 版的變更: 在 POSIX 系統上,如果你的應用程式有多個執行緒並且
multiprocessing情境使用了"fork"啟動方法:內部呼叫以產生 worker 的os.fork()函式可能會引發DeprecationWarning。傳遞一個 mp_context 以配置為使用不同的啟動方法。更多說明請參閱os.fork()文件。在 3.13 版的變更: max_workers uses
os.process_cpu_count()by default, instead ofos.cpu_count().
ProcessPoolExecutor 範例¶
import concurrent.futures
import math
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]
def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
if __name__ == '__main__':
main()
Future 物件¶
Future 類別封裝了可呼叫物件的非同步執行。Future 實例由 Executor.submit() 建立。
- class concurrent.futures.Future¶
封裝可呼叫物件的非同步執行。
Future實例由Executor.submit()建立,且除測試外不應直接建立。- cancel()¶
嘗試取消呼叫。如果呼叫目前正在執行或已完成運行且無法取消,則該方法將回傳
False,否則呼叫將被取消並且該方法將回傳True。
- cancelled()¶
如果該呼叫成功被取消,則回傳
True。
- running()¶
如果呼叫正在執行且無法取消,則回傳
True。
- done()¶
如果呼叫成功被取消或結束運行,則回傳
True。
- result(timeout=None)¶
回傳該呼叫回傳的值。如果呼叫尚未完成,則此方法將等待至多 timeout 秒。如果呼叫在 timeout 秒內未完成,則會引發
TimeoutError。timeout 可以是整數或浮點數。如果未指定 timeout 或為None,則等待時間就不會有限制。如果 future 在完成之前被取消,那麼
CancelledError將被引發。如果該呼叫引發了例外,此方法將引發相同的例外。
- exception(timeout=None)¶
回傳該呼叫引發的例外。如果呼叫尚未完成,則此方法將等待至多 timeout 秒。如果呼叫在 timeout 秒內未完成,則會引發
TimeoutError。 timeout 可以是整數或浮點數。如果未指定 timeout 或為None,則等待時間就不會有限制。如果 future 在完成之前被取消,那麼
CancelledError將被引發。如果呼叫在沒有引發的情況下完成,則回傳
None。
- add_done_callback(fn)¶
將可呼叫的 fn 附加到 future 上。當 future 被取消或完成運行時,fn 將被以 future 作為其唯一引數來呼叫。
新增的可呼叫物件按新增順序呼叫,並且始終在屬於新增它們的行程的執行緒中呼叫。如果可呼叫物件引發
Exception子類別,它將被記錄 (log) 並忽略。如果可呼叫物件引發BaseException子類別,該行為未定義。如果 future 已經完成或被取消,fn 將立即被呼叫。
以下
Future方法旨在用於單元測試和Executor實作。- set_running_or_notify_cancel()¶
此方法只能在與
Future關聯的工作被執行之前於Executor實作中呼叫,或者在單元測試中呼叫。如果該方法回傳
False則Future已被取消,即Future.cancel()被呼叫並回傳True。任何等待Future完成的執行緒(即透過as_completed()或wait())將被喚醒。如果該方法回傳
True則代表Future未被取消並已進入運行狀態,意即呼叫Future.running()將回傳True。此方法只能呼叫一次,且不能在呼叫
Future.set_result()或Future.set_exception()之後呼叫。
- set_result(result)¶
將與
Future關聯的工作結果設定為 result。此方法只能在
Executor實作中和單元測試中使用。在 3.8 版的變更: 如果
Future已經完成,此方法會引發concurrent.futures.InvalidStateError。
模組函式¶
- concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)¶
等待 fs 給定的
Future實例(可能由不同的Executor實例建立)完成。提供給 fs 的重複 future 將被刪除,並且只會回傳一次。回傳一個集合的附名二元組 (named 2-tuple of sets)。第一組名為done,包含在等待完成之前完成的 future(已完成或被取消的 future)。第二組名為not_done,包含未完成的 future(未定或運行中的 future)。timeout 可用於控制回傳前等待的最大秒數。timeout 可以是整數或浮點數。如果未指定 timeout 或為
None,則等待時間就沒有限制。return_when 表示此函式應回傳的時間。它必須是以下常數之一:
常數
描述
- concurrent.futures.FIRST_COMPLETED¶
當任何 future 完成或被取消時,該函式就會回傳。
- concurrent.futures.FIRST_EXCEPTION¶
該函式會在任何 future 透過引發例外而完結時回傳。如果 future 沒有引發例外,那麼它等同於
ALL_COMPLETED。- concurrent.futures.ALL_COMPLETED¶
當所有 future 都完成或被取消時,該函式才會回傳。
- concurrent.futures.as_completed(fs, timeout=None)¶
回傳由 fs 給定的
Future實例(可能由不同的Executor實例建立)的疊代器,它在完成時產生 future(已完成或被取消的 future)。fs 給定的任何重複的 future 將只被回傳一次。呼叫as_completed()之前完成的任何 future 將首先產生。如果__next__()被呼叫,並且在原先呼叫as_completed()的 timeout 秒後結果仍不可用,則回傳的疊代器會引發TimeoutError。timeout 可以是整數或浮點數。如果未指定 timeout 或為None,則等待時間就沒有限制。
也參考
- PEP 3148 -- futures - 非同步地執行運算
描述此功能並提出被包含於 Python 標準函式庫中的提案。
例外類別¶
- exception concurrent.futures.CancelledError¶
當 future 被取消時引發。
- exception concurrent.futures.TimeoutError¶
TimeoutError的棄用別名,在 future 操作超過給定超時 (timeout) 時引發。在 3.11 版的變更: 這個類別是
TimeoutError的別名。
- exception concurrent.futures.BrokenExecutor¶
衍生自
RuntimeError,當執行器因某種原因損壞時會引發此例外類別,並且不能用於提交或執行新任務。在 3.7 版被加入.
- exception concurrent.futures.InvalidStateError¶
目前狀態下不允許的 future 操作被執行時而引發。
在 3.8 版被加入.
- exception concurrent.futures.thread.BrokenThreadPool¶
衍生自
BrokenExecutor,當ThreadPoolExecutor的其中一個 worker 初始化失敗時會引發此例外類別。在 3.7 版被加入.
- exception concurrent.futures.process.BrokenProcessPool¶
衍生自
BrokenExecutor(以前為RuntimeError),當ProcessPoolExecutor的其中一個 worker 以不乾淨的方式終止時將引發此例外類別(例如它是從外面被 kill 掉的)。在 3.3 版被加入.