concurrent.futures
--- 啟動平行任務¶
在 3.2 版被加入.
原始碼:Lib/concurrent/futures/thread.py 與 Lib/concurrent/futures/process.py
concurrent.futures
模組提供了一個高階介面來非同步地 (asynchronously) 執行可呼叫物件 (callable) 。
非同步執行可以透過 ThreadPoolExecutor
來使用執行緒 (thread) 執行,或透過 ProcessPoolExecutor
來使用單獨行程 (process) 執行。兩者都實作了相同的介面,該介面由抽象的 Executor
類別定義。
Availability: not WASI.
此模組在 WebAssembly 平台上不起作用或無法使用。更多資訊請參閱 WebAssembly 平台。
Executor 物件¶
- class concurrent.futures.Executor¶
提供非同步執行呼叫方法的抽象類別。不應直接使用它,而應透過其具體子類別來使用。
- submit(fn, /, *args, **kwargs)¶
為可呼叫物件 fn 排程來以
fn(*args, **kwargs)
的形式執行並回傳一個表示可呼叫的執行的Future
物件。with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(pow, 323, 1235) print(future.result())
- map(fn, *iterables, timeout=None, chunksize=1)¶
類似於
map(fn, *iterables)
,除了:iterables 立即被收集而不是延遲 (lazily) 收集;
fn 是非同步執行的,並且對 fn 的多次呼叫可以並行處理。
如果
__next__()
被呼叫,且在原先呼叫Executor.map()
的 timeout 秒後結果仍不可用,回傳的疊代器就會引發TimeoutError
。timeout 可以是整數或浮點數。如果未指定 timeout 或為None
,則等待時間就不會有限制。如果 fn 呼叫引發例外,則當從疊代器中檢索到它的值時將引發該例外。
使用
ProcessPoolExecutor
時,此方法將 iterables 分成許多分塊 (chunks),並將其作為獨立的任務來提交給池 (pool)。可以透過將 chunksize 設定為正整數來指定這些分塊的(約略)大小。對於非常長的可疊代物件,chunksize 使用較大的值(與預設大小 1 相比)可以顯著提高性能。對於ThreadPoolExecutor
,chunksize 無效。在 3.5 版的變更: 新增 chunksize 引數。
- shutdown(wait=True, *, cancel_futures=False)¶
向 executor 發出訊號 (signal),表明它應該在當前未定 (pending) 的 future 完成執行時釋放它正在使用的任何資源。在關閉後呼叫
Executor.submit()
和Executor.map()
將引發RuntimeError
。如果 wait 為
True
則此方法將不會回傳,直到所有未定的 futures 完成執行並且與 executor 關聯的資源都被釋放。如果 wait 為False
則此方法將立即回傳,並且當所有未定的 future 執行完畢時,與 executor 關聯的資源將被釋放。不管 wait 的值如何,整個 Python 程式都不會退出,直到所有未定的 futures 執行完畢。如果 cancel_futures 為
True
,此方法將取消 executor 尚未開始運行的所有未定 future。無論 cancel_futures 的值如何,任何已完成或正在運行的 future 都不會被取消。如果 cancel_futures 和 wait 都為
True
,則 executor 已開始運行的所有 future 將在此方法回傳之前完成。剩餘的 future 被取消。如果使用
with
陳述句,你就可以不用明確地呼叫此方法,這將會自己關閉Executor
(如同呼叫Executor.shutdown()
時 wait 被設定為True
般等待):import shutil with ThreadPoolExecutor(max_workers=4) as e: e.submit(shutil.copy, 'src1.txt', 'dest1.txt') e.submit(shutil.copy, 'src2.txt', 'dest2.txt') e.submit(shutil.copy, 'src3.txt', 'dest3.txt') e.submit(shutil.copy, 'src4.txt', 'dest4.txt')
在 3.9 版的變更: 新增 cancel_futures。
ThreadPoolExecutor¶
ThreadPoolExecutor
是一個 Executor
子類別,它使用執行緒池來非同步地執行呼叫。
當與 Future
關聯的可呼叫物件等待另一個 Future
的結果時,可能會發生死鎖 (deadlock)。例如:
import time
def wait_on_b():
time.sleep(5)
print(b.result()) # b will never complete because it is waiting on a.
return 5
def wait_on_a():
time.sleep(5)
print(a.result()) # a will never complete because it is waiting on b.
return 6
executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)
和:
def wait_on_future():
f = executor.submit(pow, 5, 2)
# This will never complete because there is only one worker thread and
# it is executing this function.
print(f.result())
executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)
- class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())¶
一個
Executor
子類別,它使用最多有 max_workers 個執行緒的池來非同步地執行呼叫。所有排隊到
ThreadPoolExecutor
的執行緒都將在直譯器退出之前加入。請注意,執行此操作的退出處理程式會在任何使用atexit
新增的退出處理程式之前執行。這意味著必須捕獲並處理主執行緒中的例外,以便向執行緒發出訊號來正常退出 (gracefully exit)。因此,建議不要將ThreadPoolExecutor
用於長時間運行的任務。initializer 是一個可選的可呼叫物件,在每個工作執行緒開始時呼叫; initargs 是傳遞給 initializer 的引數元組 (tuple)。如果 initializer 引發例外,所有當前未定的作業以及任何向池中提交 (submit) 更多作業的嘗試都將引發
BrokenThreadPool
。在 3.5 版的變更: 如果 max_workers 為
None
或未給定,它將預設為機器上的處理器數量乘以5
,這假定了ThreadPoolExecutor
通常用於 I/O 重疊而非 CPU 密集的作業,並且 worker 的數量應該高於ProcessPoolExecutor
的 worker 數量。在 3.6 版的變更: 新增 thread_name_prefix 參數以允許使用者控制由池所建立的工作執行緒 (worker thread) 的
threading.Thread
名稱,以便於除錯。在 3.7 版的變更: 新增 initializer 與 initargs 引數。
在 3.8 版的變更: max_workers 的預設值改為
min(32, os.cpu_count() + 4)
。此預設值為 I/O 密集任務至少保留了 5 個 worker。它最多使用 32 個 CPU 核心來執行CPU 密集任務,以釋放 GIL。並且它避免了在多核機器上隱晦地使用非常大量的資源。ThreadPoolExecutor 現在在啟動 max_workers 工作執行緒之前會重用 (reuse) 空閒的工作執行緒。
在 3.13 版的變更: Default value of max_workers is changed to
min(32, (os.process_cpu_count() or 1) + 4)
.
ThreadPoolExecutor 範例¶
import concurrent.futures
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://nonexistent-subdomain.python.org/']
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
ProcessPoolExecutor¶
ProcessPoolExecutor
類別是一個 Executor
的子類別,它使用行程池來非同步地執行呼叫。ProcessPoolExecutor
使用了 multiprocessing
模組,這允許它避開全域直譯器鎖 (Global Interpreter Lock),但也意味著只能執行和回傳可被 pickle 的 (picklable) 物件。
__main__
模組必須可以被工作子行程 (worker subprocess) 引入。這意味著 ProcessPoolExecutor
將無法在交互式直譯器 (interactive interpreter) 中工作。
從提交給 ProcessPoolExecutor
的可呼叫物件中呼叫 Executor
或 Future
方法將導致死鎖。
- class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=(), max_tasks_per_child=None)¶
An
Executor
subclass that executes calls asynchronously using a pool of at most max_workers processes. If max_workers isNone
or not given, it will default toos.process_cpu_count()
. If max_workers is less than or equal to0
, then aValueError
will be raised. On Windows, max_workers must be less than or equal to61
. If it is not thenValueError
will be raised. If max_workers isNone
, then the default chosen will be at most61
, even if more processors are available. mp_context can be amultiprocessing
context orNone
. It will be used to launch the workers. If mp_context isNone
or not given, the defaultmultiprocessing
context is used. See Contexts and start methods.initializer 是一個可選的可呼叫物件,在每個工作行程 (worker process) 開始時呼叫;initargs 是傳遞給 initializer 的引數元組。如果 initializer 引發例外,所有當前未定的作業以及任何向池中提交更多作業的嘗試都將引發
BrokenProcessPool
。max_tasks_per_child 是一個可選引數,它指定單個行程在退出並被新的工作行程替換之前可以執行的最大任務數。預設情況下 max_tasks_per_child 是
None
,這意味著工作行程的生命週期將與池一樣長。當指定最大值時,在沒有 mp_context 參數的情況下,將預設使用 "spawn" 做為 multiprocessing 啟動方法。此功能與 "fork" 啟動方法不相容。在 3.3 版的變更: 當其中一個工作行程突然終止時,現在會引發
BrokenProcessPool
錯誤。在過去,此行為是未定義的 (undefined),但對 executor 或其 future 的操作經常會發生凍結或死鎖。在 3.7 版的變更: 新增了 mp_context 引數以允許使用者控制由池所建立的工作行程的 start_method。
新增 initializer 與 initargs 引數。
備註
預設的
multiprocessing
啟動方法(請參閱 Contexts and start methods)將不再是 Python 3.14 中的 fork。需要 fork 用於其ProcessPoolExecutor
的程式碼應透過傳遞mp_context=multiprocessing.get_context("fork")
參數來明確指定。在 3.11 版的變更: 新增了 max_tasks_per_child 引數以允許使用者控制池中 worker 的生命週期。
在 3.12 版的變更: 在 POSIX 系統上,如果你的應用程式有多個執行緒並且
multiprocessing
情境使用了"fork"
啟動方法:內部呼叫以產生 worker 的os.fork()
函式可能會引發DeprecationWarning
。傳遞一個 mp_context 以配置為使用不同的啟動方法。更多說明請參閱os.fork()
文件。在 3.13 版的變更: max_workers uses
os.process_cpu_count()
by default, instead ofos.cpu_count()
.
ProcessPoolExecutor 範例¶
import concurrent.futures
import math
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]
def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
if __name__ == '__main__':
main()
Future 物件¶
Future
類別封裝了可呼叫物件的非同步執行。Future
實例由 Executor.submit()
建立。
- class concurrent.futures.Future¶
封裝可呼叫物件的非同步執行。
Future
實例由Executor.submit()
建立,且除測試外不應直接建立。- cancel()¶
嘗試取消呼叫。如果呼叫當前正在執行或已完成運行且無法取消,則該方法將回傳
False
,否則呼叫將被取消並且該方法將回傳True
。
- cancelled()¶
如果該呼叫成功被取消,則回傳
True
。
- running()¶
如果呼叫正在執行且無法取消,則回傳
True
。
- done()¶
如果呼叫成功被取消或結束運行,則回傳
True
。
- result(timeout=None)¶
回傳該呼叫回傳的值。如果呼叫尚未完成,則此方法將等待至多 timeout 秒。如果呼叫在 timeout 秒內未完成,則會引發
TimeoutError
。timeout 可以是整數或浮點數。如果未指定 timeout 或為None
,則等待時間就不會有限制。如果 future 在完成之前被取消,那麼
CancelledError
將被引發。如果該呼叫引發了例外,此方法將引發相同的例外。
- exception(timeout=None)¶
回傳該呼叫引發的例外。如果呼叫尚未完成,則此方法將等待至多 timeout 秒。如果呼叫在 timeout 秒內未完成,則會引發
TimeoutError
。 timeout 可以是整數或浮點數。如果未指定 timeout 或為None
,則等待時間就不會有限制。如果 future 在完成之前被取消,那麼
CancelledError
將被引發。如果呼叫在沒有引發的情況下完成,則回傳
None
。
- add_done_callback(fn)¶
將可呼叫的 fn 附加到 future 上。當 future 被取消或完成運行時,fn 將被以 future 作為其唯一引數來呼叫。
新增的可呼叫物件按新增順序呼叫,並且始終在屬於新增它們的行程的執行緒中呼叫。如果可呼叫物件引發
Exception
子類別,它將被記錄 (log) 並忽略。如果可呼叫物件引發BaseException
子類別,該行為未定義。如果 future 已經完成或被取消,fn 將立即被呼叫。
以下
Future
方法旨在用於單元測試和Executor
實作。- set_running_or_notify_cancel()¶
此方法只能在與
Future
關聯的工作被執行之前於Executor
實作中呼叫,或者在單元測試中呼叫。如果該方法回傳
False
則Future
已被取消,即Future.cancel()
被呼叫並回傳True
。任何等待Future
完成的執行緒(即透過as_completed()
或wait()
)將被喚醒。如果該方法回傳
True
則代表Future
未被取消並已進入運行狀態,意即呼叫Future.running()
將回傳True
。此方法只能呼叫一次,且不能在呼叫
Future.set_result()
或Future.set_exception()
之後呼叫。
- set_result(result)¶
將與
Future
關聯的工作結果設定為 result。此方法只能在
Executor
實作中和單元測試中使用。在 3.8 版的變更: 如果
Future
已經完成,此方法會引發concurrent.futures.InvalidStateError
。
模組函式¶
- concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)¶
等待 fs 給定的
Future
實例(可能由不同的Executor
實例建立)完成。提供給 fs 的重複 future 將被刪除,並且只會回傳一次。回傳一個集合的附名二元組 (named 2-tuple of sets)。第一組名為done
,包含在等待完成之前完成的 future(已完成或被取消的 future)。第二組名為not_done
,包含未完成的 future(未定或運行中的 future)。timeout 可用於控制回傳前等待的最大秒數。timeout 可以是整數或浮點數。如果未指定 timeout 或為
None
,則等待時間就沒有限制。return_when 表示此函式應回傳的時間。它必須是以下常數之一:
常數
描述
- concurrent.futures.FIRST_COMPLETED¶
當任何 future 完成或被取消時,該函式就會回傳。
- concurrent.futures.FIRST_EXCEPTION¶
該函式會在任何 future 透過引發例外而完結時回傳。如果 future 沒有引發例外,那麼它等同於
ALL_COMPLETED
。- concurrent.futures.ALL_COMPLETED¶
當所有 future 都完成或被取消時,該函式才會回傳。
- concurrent.futures.as_completed(fs, timeout=None)¶
回傳由 fs 給定的
Future
實例(可能由不同的Executor
實例建立)的疊代器,它在完成時產生 future(已完成或被取消的 future)。fs 給定的任何重複的 future 將只被回傳一次。呼叫as_completed()
之前完成的任何 future 將首先產生。如果__next__()
被呼叫,並且在原先呼叫as_completed()
的 timeout 秒後結果仍不可用,則回傳的疊代器會引發TimeoutError
。timeout 可以是整數或浮點數。如果未指定 timeout 或為None
,則等待時間就沒有限制。
也參考
- PEP 3148 -- futures - 非同步地執行運算
描述此功能並提出被包含於 Python 標準函式庫中的提案。
例外類別¶
- exception concurrent.futures.CancelledError¶
當 future 被取消時引發。
- exception concurrent.futures.TimeoutError¶
TimeoutError
的棄用別名,在 future 操作超過給定超時 (timeout) 時引發。在 3.11 版的變更: 這個類別是
TimeoutError
的別名。
- exception concurrent.futures.BrokenExecutor¶
衍生自
RuntimeError
,當執行器因某種原因損壞時會引發此例外類別,並且不能用於提交或執行新任務。在 3.7 版被加入.
- exception concurrent.futures.InvalidStateError¶
當前狀態下不允許的 future 操作被執行時而引發。
在 3.8 版被加入.
- exception concurrent.futures.thread.BrokenThreadPool¶
衍生自
BrokenExecutor
,當ThreadPoolExecutor
的其中一個 worker 初始化失敗時會引發此例外類別。在 3.7 版被加入.
- exception concurrent.futures.process.BrokenProcessPool¶
衍生自
BrokenExecutor
(以前為RuntimeError
),當ProcessPoolExecutor
的其中一個 worker 以不乾淨的方式終止時將引發此例外類別(例如它是從外面被 kill 掉的)。在 3.3 版被加入.