concurrent.futures --- 啟動平行任務

在 3.2 版被加入.

原始碼:Lib/concurrent/futures/thread.pyLib/concurrent/futures/process.py


concurrent.futures 模組提供了一個高階介面來非同步地 (asynchronously) 執行可呼叫物件 (callable) 。

The asynchronous execution can be performed with threads, using ThreadPoolExecutor or InterpreterPoolExecutor, or separate processes, using ProcessPoolExecutor. Each implements the same interface, which is defined by the abstract Executor class.

Availability: not WASI.

此模組在 WebAssembly 平台上不起作用或無法使用。更多資訊請參閱 WebAssembly 平台

Executor 物件

class concurrent.futures.Executor

提供非同步執行呼叫方法的抽象類別。不應直接使用它,而應透過其具體子類別來使用。

submit(fn, /, *args, **kwargs)

為可呼叫物件 fn 排程來以 fn(*args, **kwargs) 的形式執行並回傳一個表示可呼叫的執行的 Future 物件。

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result())
map(fn, *iterables, timeout=None, chunksize=1)

類似於 map(fn, *iterables),除了:

  • iterables 立即被收集而不是延遲 (lazily) 收集;

  • fn 是非同步執行的,並且對 fn 的多次呼叫可以並行處理。

如果 __next__() 被呼叫,且在原先呼叫 Executor.map()timeout 秒後結果仍不可用,回傳的疊代器就會引發 TimeoutErrortimeout 可以是整數或浮點數。如果未指定 timeout 或為 None,則等待時間就不會有限制。

如果 fn 呼叫引發例外,則當從疊代器中檢索到它的值時將引發該例外。

When using ProcessPoolExecutor, this method chops iterables into a number of chunks which it submits to the pool as separate tasks. The (approximate) size of these chunks can be specified by setting chunksize to a positive integer. For very long iterables, using a large value for chunksize can significantly improve performance compared to the default size of 1. With ThreadPoolExecutor and InterpreterPoolExecutor, chunksize has no effect.

在 3.5 版的變更: 新增 chunksize 引數。

shutdown(wait=True, *, cancel_futures=False)

向 executor 發出訊號 (signal),表明它應該在當前未定 (pending) 的 future 完成執行時釋放它正在使用的任何資源。在關閉後呼叫 Executor.submit()Executor.map() 將引發 RuntimeError

如果 waitTrue 則此方法將不會回傳,直到所有未定的 futures 完成執行並且與 executor 關聯的資源都被釋放。如果 waitFalse 則此方法將立即回傳,並且當所有未定的 future 執行完畢時,與 executor 關聯的資源將被釋放。不管 wait 的值如何,整個 Python 程式都不會退出,直到所有未定的 futures 執行完畢。

如果 cancel_futuresTrue,此方法將取消 executor 尚未開始運行的所有未定 future。無論 cancel_futures 的值如何,任何已完成或正在運行的 future 都不會被取消。

如果 cancel_futureswait 都為 True,則 executor 已開始運行的所有 future 將在此方法回傳之前完成。剩餘的 future 被取消。

如果使用 with 陳述句,你就可以不用明確地呼叫此方法,這將會自己關閉 Executor(如同呼叫 Executor.shutdown()wait 被設定為 True 般等待):

import shutil
with ThreadPoolExecutor(max_workers=4) as e:
    e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
    e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
    e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
    e.submit(shutil.copy, 'src4.txt', 'dest4.txt')

在 3.9 版的變更: 新增 cancel_futures

ThreadPoolExecutor

ThreadPoolExecutor 是一個 Executor 子類別,它使用執行緒池來非同步地執行呼叫。

當與 Future 關聯的可呼叫物件等待另一個 Future 的結果時,可能會發生死鎖 (deadlock)。例如:

import time
def wait_on_b():
    time.sleep(5)
    print(b.result())  # b will never complete because it is waiting on a.
    return 5

def wait_on_a():
    time.sleep(5)
    print(a.result())  # a will never complete because it is waiting on b.
    return 6


executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)

和:

def wait_on_future():
    f = executor.submit(pow, 5, 2)
    # This will never complete because there is only one worker thread and
    # it is executing this function.
    print(f.result())

executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())

一個 Executor 子類別,它使用最多有 max_workers 個執行緒的池來非同步地執行呼叫。

所有排隊到 ThreadPoolExecutor 的執行緒都將在直譯器退出之前加入。請注意,執行此操作的退出處理程式會在任何使用 atexit 新增的退出處理程式之前執行。這意味著必須捕獲並處理主執行緒中的例外,以便向執行緒發出訊號來正常退出 (gracefully exit)。因此,建議不要將 ThreadPoolExecutor 用於長時間運行的任務。

initializer 是一個可選的可呼叫物件,在每個工作執行緒開始時呼叫; initargs 是傳遞給 initializer 的引數元組 (tuple)。如果 initializer 引發例外,所有當前未定的作業以及任何向池中提交 (submit) 更多作業的嘗試都將引發 BrokenThreadPool

在 3.5 版的變更: 如果 max_workersNone 或未給定,它將預設為機器上的處理器數量乘以 5,這假定了 ThreadPoolExecutor 通常用於 I/O 重疊而非 CPU 密集的作業,並且 worker 的數量應該高於 ProcessPoolExecutor 的 worker 數量。

在 3.6 版的變更: 新增 thread_name_prefix 參數以允許使用者控制由池所建立的工作執行緒 (worker thread) 的 threading.Thread 名稱,以便於除錯。

在 3.7 版的變更: 新增 initializerinitargs 引數。

在 3.8 版的變更: max_workers 的預設值改為 min(32, os.cpu_count() + 4)。此預設值為 I/O 密集任務至少保留了 5 個 worker。它最多使用 32 個 CPU 核心來執行CPU 密集任務,以釋放 GIL。並且它避免了在多核機器上隱晦地使用非常大量的資源。

ThreadPoolExecutor 現在在啟動 max_workers 工作執行緒之前會重用 (reuse) 空閒的工作執行緒。

在 3.13 版的變更: Default value of max_workers is changed to min(32, (os.process_cpu_count() or 1) + 4).

ThreadPoolExecutor 範例

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://nonexistent-subdomain.python.org/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

InterpreterPoolExecutor

The InterpreterPoolExecutor class uses a pool of interpreters to execute calls asynchronously. It is a ThreadPoolExecutor subclass, which means each worker is running in its own thread. The difference here is that each worker has its own interpreter, and runs each task using that interpreter.

The biggest benefit to using interpreters instead of only threads is true multi-core parallelism. Each interpreter has its own Global Interpreter Lock, so code running in one interpreter can run on one CPU core, while code in another interpreter runs unblocked on a different core.

The tradeoff is that writing concurrent code for use with multiple interpreters can take extra effort. However, this is because it forces you to be deliberate about how and when interpreters interact, and to be explicit about what data is shared between interpreters. This results in several benefits that help balance the extra effort, including true multi-core parallelism, For example, code written this way can make it easier to reason about concurrency. Another major benefit is that you don't have to deal with several of the big pain points of using threads, like race conditions.

Each worker's interpreter is isolated from all the other interpreters. "Isolated" means each interpreter has its own runtime state and operates completely independently. For example, if you redirect sys.stdout in one interpreter, it will not be automatically redirected any other interpreter. If you import a module in one interpreter, it is not automatically imported in any other. You would need to import the module separately in interpreter where you need it. In fact, each module imported in an interpreter is a completely separate object from the same module in a different interpreter, including sys, builtins, and even __main__.

Isolation means a mutable object, or other data, cannot be used by more than one interpreter at the same time. That effectively means interpreters cannot actually share such objects or data. Instead, each interpreter must have its own copy, and you will have to synchronize any changes between the copies manually. Immutable objects and data, like the builtin singletons, strings, and tuples of immutable objects, don't have these limitations.

Communicating and synchronizing between interpreters is most effectively done using dedicated tools, like those proposed in PEP 734. One less efficient alternative is to serialize with pickle and then send the bytes over a shared socket or pipe.

class concurrent.futures.InterpreterPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=(), shared=None)

A ThreadPoolExecutor subclass that executes calls asynchronously using a pool of at most max_workers threads. Each thread runs tasks in its own interpreter. The worker interpreters are isolated from each other, which means each has its own runtime state and that they can't share any mutable objects or other data. Each interpreter has its own Global Interpreter Lock, which means code run with this executor has true multi-core parallelism.

The optional initializer and initargs arguments have the same meaning as for ThreadPoolExecutor: the initializer is run when each worker is created, though in this case it is run.in the worker's interpreter. The executor serializes the initializer and initargs using pickle when sending them to the worker's interpreter.

備註

Functions defined in the __main__ module cannot be pickled and thus cannot be used.

備註

The executor may replace uncaught exceptions from initializer with ExecutionFailed.

The optional shared argument is a dict of objects that all interpreters in the pool share. The shared items are added to each interpreter's __main__ module. Not all objects are shareable. Shareable objects include the builtin singletons, str and bytes, and memoryview. See PEP 734 for more info.

Other caveats from parent ThreadPoolExecutor apply here.

submit() and map() work like normal, except the worker serializes the callable and arguments using pickle when sending them to its interpreter. The worker likewise serializes the return value when sending it back.

備註

Functions defined in the __main__ module cannot be pickled and thus cannot be used.

When a worker's current task raises an uncaught exception, the worker always tries to preserve the exception as-is. If that is successful then it also sets the __cause__ to a corresponding ExecutionFailed instance, which contains a summary of the original exception. In the uncommon case that the worker is not able to preserve the original as-is then it directly preserves the corresponding ExecutionFailed instance instead.

ProcessPoolExecutor

ProcessPoolExecutor 類別是一個 Executor 的子類別,它使用行程池來非同步地執行呼叫。ProcessPoolExecutor 使用了 multiprocessing 模組,這允許它避開全域直譯器鎖 (Global Interpreter Lock),但也意味著只能執行和回傳可被 pickle 的 (picklable) 物件。

__main__ 模組必須可以被工作子行程 (worker subprocess) 引入。這意味著 ProcessPoolExecutor 將無法在交互式直譯器 (interactive interpreter) 中工作。

從提交給 ProcessPoolExecutor 的可呼叫物件中呼叫 ExecutorFuture 方法將導致死鎖。

class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=(), max_tasks_per_child=None)

An Executor subclass that executes calls asynchronously using a pool of at most max_workers processes. If max_workers is None or not given, it will default to os.process_cpu_count(). If max_workers is less than or equal to 0, then a ValueError will be raised. On Windows, max_workers must be less than or equal to 61. If it is not then ValueError will be raised. If max_workers is None, then the default chosen will be at most 61, even if more processors are available. mp_context can be a multiprocessing context or None. It will be used to launch the workers. If mp_context is None or not given, the default multiprocessing context is used. See Contexts and start methods.

initializer 是一個可選的可呼叫物件,在每個工作行程 (worker process) 開始時呼叫;initargs 是傳遞給 initializer 的引數元組。如果 initializer 引發例外,所有當前未定的作業以及任何向池中提交更多作業的嘗試都將引發 BrokenProcessPool

max_tasks_per_child 是一個可選引數,它指定單個行程在退出並被新的工作行程替換之前可以執行的最大任務數。預設情況下 max_tasks_per_childNone,這意味著工作行程的生命週期將與池一樣長。當指定最大值時,在沒有 mp_context 參數的情況下,將預設使用 "spawn" 做為 multiprocessing 啟動方法。此功能與 "fork" 啟動方法不相容。

在 3.3 版的變更: 當其中一個工作行程突然終止時,現在會引發 BrokenProcessPool 錯誤。在過去,此行為是未定義的 (undefined),但對 executor 或其 future 的操作經常會發生凍結或死鎖。

在 3.7 版的變更: 新增了 mp_context 引數以允許使用者控制由池所建立的工作行程的 start_method。

新增 initializerinitargs 引數。

在 3.11 版的變更: 新增了 max_tasks_per_child 引數以允許使用者控制池中 worker 的生命週期。

在 3.12 版的變更: 在 POSIX 系統上,如果你的應用程式有多個執行緒並且 multiprocessing 情境使用了 "fork" 啟動方法:內部呼叫以產生 worker 的 os.fork() 函式可能會引發 DeprecationWarning。傳遞一個 mp_context 以配置為使用不同的啟動方法。更多說明請參閱 os.fork() 文件。

在 3.13 版的變更: max_workers uses os.process_cpu_count() by default, instead of os.cpu_count().

在 3.14 版的變更: The default process start method (see Contexts and start methods) changed away from fork. If you require the fork start method for ProcessPoolExecutor you must explicitly pass mp_context=multiprocessing.get_context("fork").

ProcessPoolExecutor 範例

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

Future 物件

Future 類別封裝了可呼叫物件的非同步執行。Future 實例由 Executor.submit() 建立。

class concurrent.futures.Future

封裝可呼叫物件的非同步執行。Future 實例由 Executor.submit() 建立,且除測試外不應直接建立。

cancel()

嘗試取消呼叫。如果呼叫當前正在執行或已完成運行且無法取消,則該方法將回傳 False,否則呼叫將被取消並且該方法將回傳 True

cancelled()

如果該呼叫成功被取消,則回傳 True

running()

如果呼叫正在執行且無法取消,則回傳 True

done()

如果呼叫成功被取消或結束運行,則回傳 True

result(timeout=None)

回傳該呼叫回傳的值。如果呼叫尚未完成,則此方法將等待至多 timeout 秒。如果呼叫在 timeout 秒內未完成,則會引發 TimeoutErrortimeout 可以是整數或浮點數。如果未指定 timeout 或為 None,則等待時間就不會有限制。

如果 future 在完成之前被取消,那麼 CancelledError 將被引發。

如果該呼叫引發了例外,此方法將引發相同的例外。

exception(timeout=None)

回傳該呼叫引發的例外。如果呼叫尚未完成,則此方法將等待至多 timeout 秒。如果呼叫在 timeout 秒內未完成,則會引發 TimeoutErrortimeout 可以是整數或浮點數。如果未指定 timeout 或為 None,則等待時間就不會有限制。

如果 future 在完成之前被取消,那麼 CancelledError 將被引發。

如果呼叫在沒有引發的情況下完成,則回傳 None

add_done_callback(fn)

將可呼叫的 fn 附加到 future 上。當 future 被取消或完成運行時,fn 將被以 future 作為其唯一引數來呼叫。

新增的可呼叫物件按新增順序呼叫,並且始終在屬於新增它們的行程的執行緒中呼叫。如果可呼叫物件引發 Exception 子類別,它將被記錄 (log) 並忽略。如果可呼叫物件引發 BaseException 子類別,該行為未定義。

如果 future 已經完成或被取消,fn 將立即被呼叫。

以下 Future 方法旨在用於單元測試和 Executor 實作。

set_running_or_notify_cancel()

此方法只能在與 Future 關聯的工作被執行之前於 Executor 實作中呼叫,或者在單元測試中呼叫。

如果該方法回傳 FalseFuture 已被取消,即 Future.cancel() 被呼叫並回傳 True。任何等待 Future 完成的執行緒(即透過 as_completed()wait())將被喚醒。

如果該方法回傳 True 則代表 Future 未被取消並已進入運行狀態,意即呼叫 Future.running() 將回傳 True

此方法只能呼叫一次,且不能在呼叫 Future.set_result()Future.set_exception() 之後呼叫。

set_result(result)

將與 Future 關聯的工作結果設定為 result

此方法只能在 Executor 實作中和單元測試中使用。

在 3.8 版的變更: 如果 Future 已經完成,此方法會引發 concurrent.futures.InvalidStateError

set_exception(exception)

將與 Future 關聯的工作結果設定為 Exception exception

此方法只能在 Executor 實作中和單元測試中使用。

在 3.8 版的變更: 如果 Future 已經完成,此方法會引發 concurrent.futures.InvalidStateError

模組函式

concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)

等待 fs 給定的 Future 實例(可能由不同的 Executor 實例建立)完成。提供給 fs 的重複 future 將被刪除,並且只會回傳一次。回傳一個集合的附名二元組 (named 2-tuple of sets)。第一組名為 done,包含在等待完成之前完成的 future(已完成或被取消的 future)。第二組名為 not_done,包含未完成的 future(未定或運行中的 future)。

timeout 可用於控制回傳前等待的最大秒數。timeout 可以是整數或浮點數。如果未指定 timeout 或為 None,則等待時間就沒有限制。

return_when 表示此函式應回傳的時間。它必須是以下常數之一:

常數

描述

concurrent.futures.FIRST_COMPLETED

當任何 future 完成或被取消時,該函式就會回傳。

concurrent.futures.FIRST_EXCEPTION

該函式會在任何 future 透過引發例外而完結時回傳。如果 future 沒有引發例外,那麼它等同於 ALL_COMPLETED

concurrent.futures.ALL_COMPLETED

當所有 future 都完成或被取消時,該函式才會回傳。

concurrent.futures.as_completed(fs, timeout=None)

回傳由 fs 給定的 Future 實例(可能由不同的 Executor 實例建立)的疊代器,它在完成時產生 future(已完成或被取消的 future)。fs 給定的任何重複的 future 將只被回傳一次。呼叫 as_completed() 之前完成的任何 future 將首先產生。如果 __next__() 被呼叫,並且在原先呼叫 as_completed()timeout 秒後結果仍不可用,則回傳的疊代器會引發 TimeoutErrortimeout 可以是整數或浮點數。如果未指定 timeout 或為 None,則等待時間就沒有限制。

也參考

PEP 3148 -- futures - 非同步地執行運算

描述此功能並提出被包含於 Python 標準函式庫中的提案。

例外類別

exception concurrent.futures.CancelledError

當 future 被取消時引發。

exception concurrent.futures.TimeoutError

TimeoutError 的棄用別名,在 future 操作超過給定超時 (timeout) 時引發。

在 3.11 版的變更: 這個類別是 TimeoutError 的別名。

exception concurrent.futures.BrokenExecutor

衍生自 RuntimeError,當執行器因某種原因損壞時會引發此例外類別,並且不能用於提交或執行新任務。

在 3.7 版被加入.

exception concurrent.futures.InvalidStateError

當前狀態下不允許的 future 操作被執行時而引發。

在 3.8 版被加入.

exception concurrent.futures.thread.BrokenThreadPool

衍生自 BrokenExecutor,當 ThreadPoolExecutor 的其中一個 worker 初始化失敗時會引發此例外類別。

在 3.7 版被加入.

exception concurrent.futures.interpreter.BrokenInterpreterPool

Derived from BrokenThreadPool, this exception class is raised when one of the workers of a InterpreterPoolExecutor has failed initializing.

在 3.14 版被加入.

exception concurrent.futures.interpreter.ExecutionFailed

Raised from InterpreterPoolExecutor when the given initializer fails or from submit() when there's an uncaught exception from the submitted task.

在 3.14 版被加入.

exception concurrent.futures.process.BrokenProcessPool

衍生自 BrokenExecutor(以前為 RuntimeError),當 ProcessPoolExecutor 的其中一個 worker 以不乾淨的方式終止時將引發此例外類別(例如它是從外面被 kill 掉的)。

在 3.3 版被加入.