concurrent.futures --- 啟動平行任務

在 3.2 版被加入.

原始碼:Lib/concurrent/futures/thread.pyLib/concurrent/futures/process.py


concurrent.futures 模組提供了一個高階介面來非同步地 (asynchronously) 執行可呼叫物件 (callable) 。

非同步執行可以透過 ThreadPoolExecutor 來使用執行緒 (thread) 執行,或透過 ProcessPoolExecutor 來使用單獨行程 (process) 執行。兩者都實作了相同的介面,該介面由抽象的 Executor 類別定義。

Availability: not WASI.

此模組在 WebAssembly 平台上不起作用或無法使用。更多資訊請參閱 WebAssembly 平台

Executor 物件

class concurrent.futures.Executor

提供非同步執行呼叫方法的抽象類別。不應直接使用它,而應透過其具體子類別來使用。

submit(fn, /, *args, **kwargs)

為可呼叫物件 fn 排程來以 fn(*args, **kwargs) 的形式執行並回傳一個表示可呼叫的執行的 Future 物件。

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result())
map(fn, *iterables, timeout=None, chunksize=1)

類似於 map(fn, *iterables),除了:

  • iterables 立即被收集而不是延遲 (lazily) 收集;

  • fn 是非同步執行的,並且對 fn 的多次呼叫可以並行處理。

如果 __next__() 被呼叫,且在原先呼叫 Executor.map()timeout 秒後結果仍不可用,回傳的疊代器就會引發 TimeoutErrortimeout 可以是整數或浮點數。如果未指定 timeout 或為 None,則等待時間就不會有限制。

如果 fn 呼叫引發例外,則當從疊代器中檢索到它的值時將引發該例外。

使用 ProcessPoolExecutor 時,此方法將 iterables 分成許多分塊 (chunks),並將其作為獨立的任務來提交給池 (pool)。可以透過將 chunksize 設定為正整數來指定這些分塊的(約略)大小。對於非常長的可疊代物件,chunksize 使用較大的值(與預設大小 1 相比)可以顯著提高性能。對於 ThreadPoolExecutorchunksize 無效。

在 3.5 版的變更: 新增 chunksize 引數。

shutdown(wait=True, *, cancel_futures=False)

向 executor 發出訊號 (signal),表明它應該在當前未定 (pending) 的 future 完成執行時釋放它正在使用的任何資源。在關閉後呼叫 Executor.submit()Executor.map() 將引發 RuntimeError

如果 waitTrue 則此方法將不會回傳,直到所有未定的 futures 完成執行並且與 executor 關聯的資源都被釋放。如果 waitFalse 則此方法將立即回傳,並且當所有未定的 future 執行完畢時,與 executor 關聯的資源將被釋放。不管 wait 的值如何,整個 Python 程式都不會退出,直到所有未定的 futures 執行完畢。

如果 cancel_futuresTrue,此方法將取消 executor 尚未開始運行的所有未定 future。無論 cancel_futures 的值如何,任何已完成或正在運行的 future 都不會被取消。

如果 cancel_futureswait 都為 True,則 executor 已開始運行的所有 future 將在此方法回傳之前完成。剩餘的 future 被取消。

如果使用 with 陳述句,你就可以不用明確地呼叫此方法,這將會自己關閉 Executor(如同呼叫 Executor.shutdown()wait 被設定為 True 般等待):

import shutil
with ThreadPoolExecutor(max_workers=4) as e:
    e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
    e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
    e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
    e.submit(shutil.copy, 'src4.txt', 'dest4.txt')

在 3.9 版的變更: 新增 cancel_futures

ThreadPoolExecutor

ThreadPoolExecutor 是一個 Executor 子類別,它使用執行緒池來非同步地執行呼叫。

當與 Future 關聯的可呼叫物件等待另一個 Future 的結果時,可能會發生死鎖 (deadlock)。例如:

import time
def wait_on_b():
    time.sleep(5)
    print(b.result())  # b will never complete because it is waiting on a.
    return 5

def wait_on_a():
    time.sleep(5)
    print(a.result())  # a will never complete because it is waiting on b.
    return 6


executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)

和:

def wait_on_future():
    f = executor.submit(pow, 5, 2)
    # This will never complete because there is only one worker thread and
    # it is executing this function.
    print(f.result())

executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())

一個 Executor 子類別,它使用最多有 max_workers 個執行緒的池來非同步地執行呼叫。

所有排隊到 ThreadPoolExecutor 的執行緒都將在直譯器退出之前加入。請注意,執行此操作的退出處理程式會在任何使用 atexit 新增的退出處理程式之前執行。這意味著必須捕獲並處理主執行緒中的例外,以便向執行緒發出訊號來正常退出 (gracefully exit)。因此,建議不要將 ThreadPoolExecutor 用於長時間運行的任務。

initializer 是一個可選的可呼叫物件,在每個工作執行緒開始時呼叫; initargs 是傳遞給 initializer 的引數元組 (tuple)。如果 initializer 引發例外,所有當前未定的作業以及任何向池中提交 (submit) 更多作業的嘗試都將引發 BrokenThreadPool

在 3.5 版的變更: 如果 max_workersNone 或未給定,它將預設為機器上的處理器數量乘以 5,這假定了 ThreadPoolExecutor 通常用於 I/O 重疊而非 CPU 密集的作業,並且 worker 的數量應該高於 ProcessPoolExecutor 的 worker 數量。

在 3.6 版的變更: 新增 thread_name_prefix 參數以允許使用者控制由池所建立的工作執行緒 (worker thread) 的 threading.Thread 名稱,以便於除錯。

在 3.7 版的變更: 新增 initializerinitargs 引數。

在 3.8 版的變更: max_workers 的預設值改為 min(32, os.cpu_count() + 4)。此預設值為 I/O 密集任務至少保留了 5 個 worker。它最多使用 32 個 CPU 核心來執行CPU 密集任務,以釋放 GIL。並且它避免了在多核機器上隱晦地使用非常大量的資源。

ThreadPoolExecutor 現在在啟動 max_workers 工作執行緒之前會重用 (reuse) 空閒的工作執行緒。

在 3.13 版的變更: Default value of max_workers is changed to min(32, (os.process_cpu_count() or 1) + 4).

ThreadPoolExecutor 範例

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://nonexistent-subdomain.python.org/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

ProcessPoolExecutor

ProcessPoolExecutor 類別是一個 Executor 的子類別,它使用行程池來非同步地執行呼叫。ProcessPoolExecutor 使用了 multiprocessing 模組,這允許它避開全域直譯器鎖 (Global Interpreter Lock),但也意味著只能執行和回傳可被 pickle 的 (picklable) 物件。

__main__ 模組必須可以被工作子行程 (worker subprocess) 引入。這意味著 ProcessPoolExecutor 將無法在交互式直譯器 (interactive interpreter) 中工作。

從提交給 ProcessPoolExecutor 的可呼叫物件中呼叫 ExecutorFuture 方法將導致死鎖。

class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=(), max_tasks_per_child=None)

An Executor subclass that executes calls asynchronously using a pool of at most max_workers processes. If max_workers is None or not given, it will default to os.process_cpu_count(). If max_workers is less than or equal to 0, then a ValueError will be raised. On Windows, max_workers must be less than or equal to 61. If it is not then ValueError will be raised. If max_workers is None, then the default chosen will be at most 61, even if more processors are available. mp_context can be a multiprocessing context or None. It will be used to launch the workers. If mp_context is None or not given, the default multiprocessing context is used. See Contexts and start methods.

initializer 是一個可選的可呼叫物件,在每個工作行程 (worker process) 開始時呼叫;initargs 是傳遞給 initializer 的引數元組。如果 initializer 引發例外,所有當前未定的作業以及任何向池中提交更多作業的嘗試都將引發 BrokenProcessPool

max_tasks_per_child 是一個可選引數,它指定單個行程在退出並被新的工作行程替換之前可以執行的最大任務數。預設情況下 max_tasks_per_childNone,這意味著工作行程的生命週期將與池一樣長。當指定最大值時,在沒有 mp_context 參數的情況下,將預設使用 "spawn" 做為 multiprocessing 啟動方法。此功能與 "fork" 啟動方法不相容。

在 3.3 版的變更: 當其中一個工作行程突然終止時,現在會引發 BrokenProcessPool 錯誤。在過去,此行為是未定義的 (undefined),但對 executor 或其 future 的操作經常會發生凍結或死鎖。

在 3.7 版的變更: 新增了 mp_context 引數以允許使用者控制由池所建立的工作行程的 start_method。

新增 initializerinitargs 引數。

備註

預設的 multiprocessing 啟動方法(請參閱 Contexts and start methods)將不再是 Python 3.14 中的 fork。需要 fork 用於其 ProcessPoolExecutor 的程式碼應透過傳遞 mp_context=multiprocessing.get_context("fork") 參數來明確指定。

在 3.11 版的變更: 新增了 max_tasks_per_child 引數以允許使用者控制池中 worker 的生命週期。

在 3.12 版的變更: 在 POSIX 系統上,如果你的應用程式有多個執行緒並且 multiprocessing 情境使用了 "fork" 啟動方法:內部呼叫以產生 worker 的 os.fork() 函式可能會引發 DeprecationWarning。傳遞一個 mp_context 以配置為使用不同的啟動方法。更多說明請參閱 os.fork() 文件。

在 3.13 版的變更: max_workers uses os.process_cpu_count() by default, instead of os.cpu_count().

ProcessPoolExecutor 範例

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

Future 物件

Future 類別封裝了可呼叫物件的非同步執行。Future 實例由 Executor.submit() 建立。

class concurrent.futures.Future

封裝可呼叫物件的非同步執行。Future 實例由 Executor.submit() 建立,且除測試外不應直接建立。

cancel()

嘗試取消呼叫。如果呼叫當前正在執行或已完成運行且無法取消,則該方法將回傳 False,否則呼叫將被取消並且該方法將回傳 True

cancelled()

如果該呼叫成功被取消,則回傳 True

running()

如果呼叫正在執行且無法取消,則回傳 True

done()

如果呼叫成功被取消或結束運行,則回傳 True

result(timeout=None)

回傳該呼叫回傳的值。如果呼叫尚未完成,則此方法將等待至多 timeout 秒。如果呼叫在 timeout 秒內未完成,則會引發 TimeoutErrortimeout 可以是整數或浮點數。如果未指定 timeout 或為 None,則等待時間就不會有限制。

如果 future 在完成之前被取消,那麼 CancelledError 將被引發。

如果該呼叫引發了例外,此方法將引發相同的例外。

exception(timeout=None)

回傳該呼叫引發的例外。如果呼叫尚未完成,則此方法將等待至多 timeout 秒。如果呼叫在 timeout 秒內未完成,則會引發 TimeoutErrortimeout 可以是整數或浮點數。如果未指定 timeout 或為 None,則等待時間就不會有限制。

如果 future 在完成之前被取消,那麼 CancelledError 將被引發。

如果呼叫在沒有引發的情況下完成,則回傳 None

add_done_callback(fn)

將可呼叫的 fn 附加到 future 上。當 future 被取消或完成運行時,fn 將被以 future 作為其唯一引數來呼叫。

新增的可呼叫物件按新增順序呼叫,並且始終在屬於新增它們的行程的執行緒中呼叫。如果可呼叫物件引發 Exception 子類別,它將被記錄 (log) 並忽略。如果可呼叫物件引發 BaseException 子類別,該行為未定義。

如果 future 已經完成或被取消,fn 將立即被呼叫。

以下 Future 方法旨在用於單元測試和 Executor 實作。

set_running_or_notify_cancel()

此方法只能在與 Future 關聯的工作被執行之前於 Executor 實作中呼叫,或者在單元測試中呼叫。

如果該方法回傳 FalseFuture 已被取消,即 Future.cancel() 被呼叫並回傳 True。任何等待 Future 完成的執行緒(即透過 as_completed()wait())將被喚醒。

如果該方法回傳 True 則代表 Future 未被取消並已進入運行狀態,意即呼叫 Future.running() 將回傳 True

此方法只能呼叫一次,且不能在呼叫 Future.set_result()Future.set_exception() 之後呼叫。

set_result(result)

將與 Future 關聯的工作結果設定為 result

此方法只能在 Executor 實作中和單元測試中使用。

在 3.8 版的變更: 如果 Future 已經完成,此方法會引發 concurrent.futures.InvalidStateError

set_exception(exception)

將與 Future 關聯的工作結果設定為 Exception exception

此方法只能在 Executor 實作中和單元測試中使用。

在 3.8 版的變更: 如果 Future 已經完成,此方法會引發 concurrent.futures.InvalidStateError

模組函式

concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)

等待 fs 給定的 Future 實例(可能由不同的 Executor 實例建立)完成。提供給 fs 的重複 future 將被刪除,並且只會回傳一次。回傳一個集合的附名二元組 (named 2-tuple of sets)。第一組名為 done,包含在等待完成之前完成的 future(已完成或被取消的 future)。第二組名為 not_done,包含未完成的 future(未定或運行中的 future)。

timeout 可用於控制回傳前等待的最大秒數。timeout 可以是整數或浮點數。如果未指定 timeout 或為 None,則等待時間就沒有限制。

return_when 表示此函式應回傳的時間。它必須是以下常數之一:

常數

描述

concurrent.futures.FIRST_COMPLETED

當任何 future 完成或被取消時,該函式就會回傳。

concurrent.futures.FIRST_EXCEPTION

該函式會在任何 future 透過引發例外而完結時回傳。如果 future 沒有引發例外,那麼它等同於 ALL_COMPLETED

concurrent.futures.ALL_COMPLETED

當所有 future 都完成或被取消時,該函式才會回傳。

concurrent.futures.as_completed(fs, timeout=None)

回傳由 fs 給定的 Future 實例(可能由不同的 Executor 實例建立)的疊代器,它在完成時產生 future(已完成或被取消的 future)。fs 給定的任何重複的 future 將只被回傳一次。呼叫 as_completed() 之前完成的任何 future 將首先產生。如果 __next__() 被呼叫,並且在原先呼叫 as_completed()timeout 秒後結果仍不可用,則回傳的疊代器會引發 TimeoutErrortimeout 可以是整數或浮點數。如果未指定 timeout 或為 None,則等待時間就沒有限制。

也參考

PEP 3148 -- futures - 非同步地執行運算

描述此功能並提出被包含於 Python 標準函式庫中的提案。

例外類別

exception concurrent.futures.CancelledError

當 future 被取消時引發。

exception concurrent.futures.TimeoutError

TimeoutError 的棄用別名,在 future 操作超過給定超時 (timeout) 時引發。

在 3.11 版的變更: 這個類別是 TimeoutError 的別名。

exception concurrent.futures.BrokenExecutor

衍生自 RuntimeError,當執行器因某種原因損壞時會引發此例外類別,並且不能用於提交或執行新任務。

在 3.7 版被加入.

exception concurrent.futures.InvalidStateError

當前狀態下不允許的 future 操作被執行時而引發。

在 3.8 版被加入.

exception concurrent.futures.thread.BrokenThreadPool

衍生自 BrokenExecutor,當 ThreadPoolExecutor 的其中一個 worker 初始化失敗時會引發此例外類別。

在 3.7 版被加入.

exception concurrent.futures.process.BrokenProcessPool

衍生自 BrokenExecutor(以前為 RuntimeError),當 ProcessPoolExecutor 的其中一個 worker 以不乾淨的方式終止時將引發此例外類別(例如它是從外面被 kill 掉的)。

在 3.3 版被加入.