concurrent.futures --- Launching parallel tasks

Added in version 3.2.

Source code: Lib/concurrent/futures/thread.py, Lib/concurrent/futures/process.py, and Lib/concurrent/futures/interpreter.py


Модуль concurrent.futures забезпечує інтерфейс високого рівня для асинхронного виконання викликів.

The asynchronous execution can be performed with threads, using ThreadPoolExecutor or InterpreterPoolExecutor, or separate processes, using ProcessPoolExecutor. Each implements the same interface, which is defined by the abstract Executor class.

Availability: not WASI.

This module does not work or is not available on WebAssembly. See WebAssembly platforms for more information.

Об'єкти виконавця

class concurrent.futures.Executor

Абстрактний клас, який надає методи для асинхронного виконання викликів. Його слід використовувати не безпосередньо, а через його конкретні підкласи.

submit(fn, /, *args, **kwargs)

Планує виконання викликаного, fn, як fn(*args, **kwargs) і повертає об’єкт Future, який представляє виконання викликаного.

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result())
map(fn, *iterables, timeout=None, chunksize=1, buffersize=None)

Similar to map(fn, *iterables) except:

  • The iterables are collected immediately rather than lazily, unless a buffersize is specified to limit the number of submitted tasks whose results have not yet been yielded. If the buffer is full, iteration over the iterables pauses until a result is yielded from the buffer.

  • fn is executed asynchronously and several calls to fn may be made concurrently.

The returned iterator raises a TimeoutError if __next__() is called and the result isn't available after timeout seconds from the original call to Executor.map(). timeout can be an int or a float. If timeout is not specified or None, there is no limit to the wait time.

If a fn call raises an exception, then that exception will be raised when its value is retrieved from the iterator.

When using ProcessPoolExecutor, this method chops iterables into a number of chunks which it submits to the pool as separate tasks. The (approximate) size of these chunks can be specified by setting chunksize to a positive integer. For very long iterables, using a large value for chunksize can significantly improve performance compared to the default size of 1. With ThreadPoolExecutor and InterpreterPoolExecutor, chunksize has no effect.

Berubah pada versi 3.5: Added the chunksize parameter.

Berubah pada versi 3.14: Added the buffersize parameter.

shutdown(wait=True, *, cancel_futures=False)

Дайте сигнал виконавцю, що він повинен звільнити будь-які ресурси, які він використовує, коли завершено виконання поточних ф’ючерсів. Виклики Executor.submit() і Executor.map(), здійснені після вимкнення, викличуть RuntimeError.

Якщо wait має значення True, тоді цей метод не повернеться, доки не буде завершено виконання всіх очікуваних ф’ючерсів і не буде звільнено ресурси, пов’язані з виконавцем. Якщо wait має значення False, тоді цей метод повернеться негайно, а ресурси, пов’язані з виконавцем, будуть звільнені, коли завершиться виконання всіх очікуваних ф’ючерсів. Незалежно від значення wait, уся програма Python не завершить роботу, доки не буде завершено виконання всіх незавершених ф’ючерсів.

Якщо cancel_futures має значення True, цей метод скасує всі незавершені ф’ючерси, які виконавець ще не запускав. Будь-які завершені або запущені ф’ючерси не будуть скасовані, незалежно від значення cancel_futures.

Якщо і cancel_futures, і wait мають значення True, усі ф’ючерси, які почав виконувати виконавець, будуть завершені до повернення цього методу. Решта ф'ючерсів скасовано.

Ви можете уникнути необхідності явного виклику цього методу, якщо використаєте інструкцію with, яка вимкне Executor (очікуючи так, ніби Executor.shutdown() викликається з установленим wait до Правда):

import shutil
with ThreadPoolExecutor(max_workers=4) as e:
    e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
    e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
    e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
    e.submit(shutil.copy, 'src4.txt', 'dest4.txt')

Berubah pada versi 3.9: Додано cancel_futures.

ThreadPoolExecutor

ThreadPoolExecutor — це підклас Executor, який використовує пул потоків для асинхронного виконання викликів.

Взаємоблокування можуть виникати, коли виклик, пов’язаний із Future, чекає результатів іншого Future. Наприклад:

import time
def wait_on_b():
    time.sleep(5)
    print(b.result())  # b will never complete because it is waiting on a.
    return 5

def wait_on_a():
    time.sleep(5)
    print(a.result())  # a will never complete because it is waiting on b.
    return 6


executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)

І:

def wait_on_future():
    f = executor.submit(pow, 5, 2)
    # This will never complete because there is only one worker thread and
    # it is executing this function.
    print(f.result())

executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())

Підклас Executor, який використовує пул щонайбільше max_workers потоків для асинхронного виконання викликів.

All threads enqueued to ThreadPoolExecutor will be joined before the interpreter can exit. Note that the exit handler which does this is executed before any exit handlers added using atexit. This means exceptions in the main thread must be caught and handled in order to signal threads to exit gracefully. For this reason, it is recommended that ThreadPoolExecutor not be used for long-running tasks.

initializer — необов’язковий виклик, який викликається на початку кожного робочого потоку; initargs — це кортеж аргументів, що передається ініціалізатору. Якщо ініціалізатор викликає виняток, усі наразі незавершені завдання викличуть BrokenThreadPool, а також будь-яка спроба відправити більше завдань до пулу.

Berubah pada versi 3.5: Якщо max_workers має значення None або не вказано, за замовчуванням буде використовуватися кількість процесорів на машині, помножена на 5, припускаючи, що ThreadPoolExecutor часто використовується для перекриття вводу-виводу замість роботи центрального процесора, а кількість робітників має бути більшою за кількість робітників для ProcessPoolExecutor.

Berubah pada versi 3.6: Added the thread_name_prefix parameter to allow users to control the threading.Thread names for worker threads created by the pool for easier debugging.

Berubah pada versi 3.7: Додано аргументи initializer і initargs.

Berubah pada versi 3.8: Значення за замовчуванням max_workers змінено на min(32, os.cpu_count() + 4). Це значення за замовчуванням зберігає принаймні 5 працівників для завдань, пов’язаних із вводом-виводом. Він використовує щонайбільше 32 ядра ЦП для пов’язаних із ЦП завдань, які випускають GIL. І це дозволяє уникнути використання дуже великих ресурсів неявно на багатоядерних машинах.

ThreadPoolExecutor тепер повторно використовує неактивні робочі потоки перед запуском робочих потоків max_workers.

Berubah pada versi 3.13: Default value of max_workers is changed to min(32, (os.process_cpu_count() or 1) + 4).

Приклад ThreadPoolExecutor

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://nonexistent-subdomain.python.org/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

InterpreterPoolExecutor

The InterpreterPoolExecutor class uses a pool of interpreters to execute calls asynchronously. It is a ThreadPoolExecutor subclass, which means each worker is running in its own thread. The difference here is that each worker has its own interpreter, and runs each task using that interpreter.

The biggest benefit to using interpreters instead of only threads is true multi-core parallelism. Each interpreter has its own Global Interpreter Lock, so code running in one interpreter can run on one CPU core, while code in another interpreter runs unblocked on a different core.

The tradeoff is that writing concurrent code for use with multiple interpreters can take extra effort. However, this is because it forces you to be deliberate about how and when interpreters interact, and to be explicit about what data is shared between interpreters. This results in several benefits that help balance the extra effort, including true multi-core parallelism, For example, code written this way can make it easier to reason about concurrency. Another major benefit is that you don't have to deal with several of the big pain points of using threads, like race conditions.

Each worker's interpreter is isolated from all the other interpreters. "Isolated" means each interpreter has its own runtime state and operates completely independently. For example, if you redirect sys.stdout in one interpreter, it will not be automatically redirected to any other interpreter. If you import a module in one interpreter, it is not automatically imported in any other. You would need to import the module separately in interpreter where you need it. In fact, each module imported in an interpreter is a completely separate object from the same module in a different interpreter, including sys, builtins, and even __main__.

Isolation means a mutable object, or other data, cannot be used by more than one interpreter at the same time. That effectively means interpreters cannot actually share such objects or data. Instead, each interpreter must have its own copy, and you will have to synchronize any changes between the copies manually. Immutable objects and data, like the builtin singletons, strings, and tuples of immutable objects, don't have these limitations.

Communicating and synchronizing between interpreters is most effectively done using dedicated tools, like those proposed in PEP 734. One less efficient alternative is to serialize with pickle and then send the bytes over a shared socket or pipe.

class concurrent.futures.InterpreterPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())

A ThreadPoolExecutor subclass that executes calls asynchronously using a pool of at most max_workers threads. Each thread runs tasks in its own interpreter. The worker interpreters are isolated from each other, which means each has its own runtime state and that they can't share any mutable objects or other data. Each interpreter has its own Global Interpreter Lock, which means code run with this executor has true multi-core parallelism.

The optional initializer and initargs arguments have the same meaning as for ThreadPoolExecutor: the initializer is run when each worker is created, though in this case it is run in the worker's interpreter. The executor serializes the initializer and initargs using pickle when sending them to the worker's interpreter.

Catatan

The executor may replace uncaught exceptions from initializer with ExecutionFailed.

Other caveats from parent ThreadPoolExecutor apply here.

submit() and map() work like normal, except the worker serializes the callable and arguments using pickle when sending them to its interpreter. The worker likewise serializes the return value when sending it back.

When a worker's current task raises an uncaught exception, the worker always tries to preserve the exception as-is. If that is successful then it also sets the __cause__ to a corresponding ExecutionFailed instance, which contains a summary of the original exception. In the uncommon case that the worker is not able to preserve the original as-is then it directly preserves the corresponding ExecutionFailed instance instead.

ProcessPoolExecutor

Клас ProcessPoolExecutor є підкласом Executor, який використовує пул процесів для асинхронного виконання викликів. ProcessPoolExecutor використовує модуль multiprocessing, який дозволяє йому обходити Global Interpreter Lock, але також означає, що можна виконувати та повертати лише об’єкти, які можна вибрати.

Модуль __main__ має бути імпортованим робочими підпроцесами. Це означає, що ProcessPoolExecutor не працюватиме в інтерактивному інтерпретаторі.

Виклик методів Executor або Future із виклику, надісланого до ProcessPoolExecutor, призведе до взаємоблокування.

Note that the restrictions on functions and arguments needing to picklable as per multiprocessing.Process apply when using submit() and map() on a ProcessPoolExecutor. A function defined in a REPL or a lambda should not be expected to work.

class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=(), max_tasks_per_child=None)

An Executor subclass that executes calls asynchronously using a pool of at most max_workers processes. If max_workers is None or not given, it will default to os.process_cpu_count(). If max_workers is less than or equal to 0, then a ValueError will be raised. On Windows, max_workers must be less than or equal to 61. If it is not then ValueError will be raised. If max_workers is None, then the default chosen will be at most 61, even if more processors are available. mp_context can be a multiprocessing context or None. It will be used to launch the workers. If mp_context is None or not given, the default multiprocessing context is used. See Contexts and start methods.

initializer — необов’язковий виклик, який викликається на початку кожного робочого процесу; initargs — це кортеж аргументів, що передається ініціалізатору. Якщо ініціалізатор викличе виняток, усі наразі незавершені завдання викличуть BrokenProcessPool, а також будь-яка спроба надіслати більше завдань до пулу.

max_tasks_per_child is an optional argument that specifies the maximum number of tasks a single process can execute before it will exit and be replaced with a fresh worker process. By default max_tasks_per_child is None which means worker processes will live as long as the pool. When a max is specified, the "spawn" multiprocessing start method will be used by default in absence of a mp_context parameter. This feature is incompatible with the "fork" start method.

Berubah pada versi 3.3: When one of the worker processes terminates abruptly, a BrokenProcessPool error is now raised. Previously, behaviour was undefined but operations on the executor or its futures would often freeze or deadlock.

Berubah pada versi 3.7: Аргумент mp_context додано, щоб дозволити користувачам керувати start_method для робочих процесів, створених пулом.

Додано аргументи initializer і initargs.

Berubah pada versi 3.11: The max_tasks_per_child argument was added to allow users to control the lifetime of workers in the pool.

Berubah pada versi 3.12: On POSIX systems, if your application has multiple threads and the multiprocessing context uses the "fork" start method: The os.fork() function called internally to spawn workers may raise a DeprecationWarning. Pass a mp_context configured to use a different start method. See the os.fork() documentation for further explanation.

Berubah pada versi 3.13: max_workers uses os.process_cpu_count() by default, instead of os.cpu_count().

Berubah pada versi 3.14: The default process start method (see Contexts and start methods) changed away from fork. If you require the fork start method for ProcessPoolExecutor you must explicitly pass mp_context=multiprocessing.get_context("fork").

terminate_workers()

Attempt to terminate all living worker processes immediately by calling Process.terminate on each of them. Internally, it will also call Executor.shutdown() to ensure that all other resources associated with the executor are freed.

After calling this method the caller should no longer submit tasks to the executor.

Added in version 3.14.

kill_workers()

Attempt to kill all living worker processes immediately by calling Process.kill on each of them. Internally, it will also call Executor.shutdown() to ensure that all other resources associated with the executor are freed.

After calling this method the caller should no longer submit tasks to the executor.

Added in version 3.14.

Приклад ProcessPoolExecutor

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

Об'єкти майбутнього

Клас Future інкапсулює асинхронне виконання викликаного. Future екземпляри створюються Executor.submit().

class concurrent.futures.Future

Інкапсулює асинхронне виконання викликаного. Future екземпляри створюються Executor.submit() і не повинні створюватися безпосередньо, за винятком тестування.

cancel()

Спроба скасувати виклик. Якщо виклик наразі виконується або завершується і його не можна скасувати, тоді метод поверне False, інакше виклик буде скасовано, а метод поверне True.

cancelled()

Повертає True, якщо виклик було успішно скасовано.

running()

Повертає True, якщо виклик зараз виконується і не може бути скасований.

done()

Повертає True, якщо виклик було успішно скасовано або завершено.

result(timeout=None)

Return the value returned by the call. If the call hasn't yet completed then this method will wait up to timeout seconds. If the call hasn't completed in timeout seconds, then a TimeoutError will be raised. timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time.

Якщо ф'ючерс скасовано до завершення, тоді буде викликано CancelledError.

Якщо виклик викликав виняток, цей метод викличе той самий виняток.

exception(timeout=None)

Return the exception raised by the call. If the call hasn't yet completed then this method will wait up to timeout seconds. If the call hasn't completed in timeout seconds, then a TimeoutError will be raised. timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time.

Якщо ф'ючерс скасовано до завершення, тоді буде викликано CancelledError.

Якщо виклик завершився без підняття, повертається None.

add_done_callback(fn)

Приєднує fn до майбутнього. fn буде викликано з майбутнім як єдиним аргументом, коли майбутнє скасовується або завершує роботу.

Додані виклики викликаються в тому порядку, в якому вони були додані, і завжди викликаються в потоці, що належить до процесу, який їх додав. Якщо виклик викликає підклас Exception, він буде зареєстрований і проігнорований. Якщо виклик викликає підклас BaseException, поведінка не визначена.

Якщо ф'ючерс уже завершено або скасовано, fn буде викликано негайно.

Наступні методи Future призначені для використання в модульних тестах і реалізаціях Executor.

set_running_or_notify_cancel()

Цей метод має викликатися лише реалізаціями Executor перед виконанням роботи, пов’язаної з Future і модульними тестами.

If the method returns False then the Future was cancelled, i.e. Future.cancel() was called and returned True. Any threads waiting on the Future completing (i.e. through as_completed() or wait()) will be woken up.

If the method returns True then the Future was not cancelled and has been put in the running state, i.e. calls to Future.running() will return True.

Цей метод можна викликати лише один раз і не можна викликати після виклику Future.set_result() або Future.set_exception().

set_result(result)

Встановлює для результату роботи, пов’язаної з Future значення result.

Цей метод має використовуватися лише реалізаціями Executor і модульними тестами.

Berubah pada versi 3.8: Цей метод викликає concurrent.futures.InvalidStateError, якщо Future вже виконано.

set_exception(exception)

Встановлює для результату роботи, пов’язаної з Future значення Exception виняток.

Цей метод має використовуватися лише реалізаціями Executor і модульними тестами.

Berubah pada versi 3.8: Цей метод викликає concurrent.futures.InvalidStateError, якщо Future вже виконано.

Функції модуля

concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)

Зачекайте, доки екземпляри Future (можливо, створені різними екземплярами Executor), надані fs, завершаться. Подвійні ф’ючерси, надані fs, видаляються та повертаються лише один раз. Повертає іменований 2-кортеж наборів. Перший набір, названий done, містить ф'ючерси, які завершилися (завершені або скасовані ф'ючерси) до завершення очікування. Другий набір під назвою not_done містить ф'ючерси, які не завершилися (ф'ючерси, що очікують або виконуються).

timeout можна використовувати для контролю максимальної кількості секунд очікування перед поверненням. timeout може бути int або float. Якщо timeout не вказано або None, час очікування не обмежений.

return_when вказує, коли ця функція має повернутися. Це має бути одна з таких констант:

Konstanta

Deskripsi

concurrent.futures.FIRST_COMPLETED

Функція повернеться, коли будь-який майбутній завершиться або буде скасовано.

concurrent.futures.FIRST_EXCEPTION

The function will return when any future finishes by raising an exception. If no future raises an exception then it is equivalent to ALL_COMPLETED.

concurrent.futures.ALL_COMPLETED

Функція повернеться, коли всі ф’ючерси закінчаться або будуть скасовані.

concurrent.futures.as_completed(fs, timeout=None)

Returns an iterator over the Future instances (possibly created by different Executor instances) given by fs that yields futures as they complete (finished or cancelled futures). Any futures given by fs that are duplicated will be returned once. Any futures that completed before as_completed() is called will be yielded first. The returned iterator raises a TimeoutError if __next__() is called and the result isn't available after timeout seconds from the original call to as_completed(). timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time.

Lihat juga

PEP 3148 -- ф'ючерси - виконувати обчислення асинхронно

Пропозиція, яка описує цю функцію для включення в стандартну бібліотеку Python.

Класи винятків

exception concurrent.futures.CancelledError

Піднімається, коли ф’ючерс скасовується.

exception concurrent.futures.TimeoutError

A deprecated alias of TimeoutError, raised when a future operation exceeds the given timeout.

Berubah pada versi 3.11: This class was made an alias of TimeoutError.

exception concurrent.futures.BrokenExecutor

Похідний від RuntimeError, цей клас винятків виникає, коли виконавець з певної причини не працює, і його не можна використовувати для надсилання або виконання нових завдань.

Added in version 3.7.

exception concurrent.futures.InvalidStateError

Викликається, коли над майбутнім виконується операція, яка не дозволена в поточному стані.

Added in version 3.8.

exception concurrent.futures.thread.BrokenThreadPool

Derived from BrokenExecutor, this exception class is raised when one of the workers of a ThreadPoolExecutor has failed initializing.

Added in version 3.7.

exception concurrent.futures.interpreter.BrokenInterpreterPool

Derived from BrokenThreadPool, this exception class is raised when one of the workers of a InterpreterPoolExecutor has failed initializing.

Added in version 3.14.

exception concurrent.futures.interpreter.ExecutionFailed

Raised from InterpreterPoolExecutor when the given initializer fails or from submit() when there's an uncaught exception from the submitted task.

Added in version 3.14.

exception concurrent.futures.process.BrokenProcessPool

Derived from BrokenExecutor (formerly RuntimeError), this exception class is raised when one of the workers of a ProcessPoolExecutor has terminated in a non-clean fashion (for example, if it was killed from the outside).

Added in version 3.3.