concurrent.futures — Launching parallel tasks

Added in version 3.2.

Вихідний код: Lib/concurrent/futures/thread.py і Lib/concurrent/futures/process.py


Модуль concurrent.futures забезпечує інтерфейс високого рівня для асинхронного виконання викликів.

Асинхронне виконання може виконуватися потоками, використовуючи ThreadPoolExecutor, або окремими процесами, використовуючи ProcessPoolExecutor. Обидва реалізують однаковий інтерфейс, який визначається абстрактним класом Executor.

Availability: not WASI.

This module does not work or is not available on WebAssembly. See WebAssembly platforms for more information.

Об’єкти виконавця

class concurrent.futures.Executor

Абстрактний клас, який надає методи для асинхронного виконання викликів. Його слід використовувати не безпосередньо, а через його конкретні підкласи.

submit(fn, /, *args, **kwargs)

Планує виконання викликаного, fn, як fn(*args, **kwargs) і повертає об’єкт Future, який представляє виконання викликаного.

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result())
map(fn, *iterables, timeout=None, chunksize=1)

Similar to map(fn, *iterables) except:

  • iterables збираються негайно, а не ліниво;

  • fn is executed asynchronously and several calls to fn may be made concurrently.

The returned iterator raises a TimeoutError if __next__() is called and the result isn’t available after timeout seconds from the original call to Executor.map(). timeout can be an int or a float. If timeout is not specified or None, there is no limit to the wait time.

If a fn call raises an exception, then that exception will be raised when its value is retrieved from the iterator.

Під час використання ProcessPoolExecutor цей метод розбиває iterables на кілька фрагментів, які надсилає до пулу як окремі завдання. (Приблизний) розмір цих фрагментів можна вказати, встановивши для chunksize додатне ціле число. Для дуже довгих ітерацій використання великого значення chunksize може значно підвищити продуктивність порівняно з розміром за замовчуванням 1. З ThreadPoolExecutor chunksize не впливає.

Змінено в версії 3.5: Додано аргумент chunksize.

shutdown(wait=True, *, cancel_futures=False)

Дайте сигнал виконавцю, що він повинен звільнити будь-які ресурси, які він використовує, коли завершено виконання поточних ф’ючерсів. Виклики Executor.submit() і Executor.map(), здійснені після вимкнення, викличуть RuntimeError.

Якщо wait має значення True, тоді цей метод не повернеться, доки не буде завершено виконання всіх очікуваних ф’ючерсів і не буде звільнено ресурси, пов’язані з виконавцем. Якщо wait має значення False, тоді цей метод повернеться негайно, а ресурси, пов’язані з виконавцем, будуть звільнені, коли завершиться виконання всіх очікуваних ф’ючерсів. Незалежно від значення wait, уся програма Python не завершить роботу, доки не буде завершено виконання всіх незавершених ф’ючерсів.

Якщо cancel_futures має значення True, цей метод скасує всі незавершені ф’ючерси, які виконавець ще не запускав. Будь-які завершені або запущені ф’ючерси не будуть скасовані, незалежно від значення cancel_futures.

Якщо і cancel_futures, і wait мають значення True, усі ф’ючерси, які почав виконувати виконавець, будуть завершені до повернення цього методу. Решта ф’ючерсів скасовано.

Ви можете уникнути необхідності явного виклику цього методу, якщо використаєте інструкцію with, яка вимкне Executor (очікуючи так, ніби Executor.shutdown() викликається з установленим wait до Правда):

import shutil
with ThreadPoolExecutor(max_workers=4) as e:
    e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
    e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
    e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
    e.submit(shutil.copy, 'src4.txt', 'dest4.txt')

Змінено в версії 3.9: Додано cancel_futures.

ThreadPoolExecutor

ThreadPoolExecutor — це підклас Executor, який використовує пул потоків для асинхронного виконання викликів.

Взаємоблокування можуть виникати, коли виклик, пов’язаний із Future, чекає результатів іншого Future. Наприклад:

import time
def wait_on_b():
    time.sleep(5)
    print(b.result())  # b will never complete because it is waiting on a.
    return 5

def wait_on_a():
    time.sleep(5)
    print(a.result())  # a will never complete because it is waiting on b.
    return 6


executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)

І:

def wait_on_future():
    f = executor.submit(pow, 5, 2)
    # This will never complete because there is only one worker thread and
    # it is executing this function.
    print(f.result())

executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())

Підклас Executor, який використовує пул щонайбільше max_workers потоків для асинхронного виконання викликів.

All threads enqueued to ThreadPoolExecutor will be joined before the interpreter can exit. Note that the exit handler which does this is executed before any exit handlers added using atexit. This means exceptions in the main thread must be caught and handled in order to signal threads to exit gracefully. For this reason, it is recommended that ThreadPoolExecutor not be used for long-running tasks.

initializer — необов’язковий виклик, який викликається на початку кожного робочого потоку; initargs — це кортеж аргументів, що передається ініціалізатору. Якщо ініціалізатор викликає виняток, усі наразі незавершені завдання викличуть BrokenThreadPool, а також будь-яка спроба відправити більше завдань до пулу.

Змінено в версії 3.5: Якщо max_workers має значення None або не вказано, за замовчуванням буде використовуватися кількість процесорів на машині, помножена на 5, припускаючи, що ThreadPoolExecutor часто використовується для перекриття вводу-виводу замість роботи центрального процесора, а кількість робітників має бути більшою за кількість робітників для ProcessPoolExecutor.

Змінено в версії 3.6: Added the thread_name_prefix parameter to allow users to control the threading.Thread names for worker threads created by the pool for easier debugging.

Змінено в версії 3.7: Додано аргументи initializer і initargs.

Змінено в версії 3.8: Значення за замовчуванням max_workers змінено на min(32, os.cpu_count() + 4). Це значення за замовчуванням зберігає принаймні 5 працівників для завдань, пов’язаних із вводом-виводом. Він використовує щонайбільше 32 ядра ЦП для пов’язаних із ЦП завдань, які випускають GIL. І це дозволяє уникнути використання дуже великих ресурсів неявно на багатоядерних машинах.

ThreadPoolExecutor тепер повторно використовує неактивні робочі потоки перед запуском робочих потоків max_workers.

Змінено в версії 3.13: Default value of max_workers is changed to min(32, (os.process_cpu_count() or 1) + 4).

Приклад ThreadPoolExecutor

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://nonexistent-subdomain.python.org/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

ProcessPoolExecutor

Клас ProcessPoolExecutor є підкласом Executor, який використовує пул процесів для асинхронного виконання викликів. ProcessPoolExecutor використовує модуль multiprocessing, який дозволяє йому обходити Global Interpreter Lock, але також означає, що можна виконувати та повертати лише об’єкти, які можна вибрати.

Модуль __main__ має бути імпортованим робочими підпроцесами. Це означає, що ProcessPoolExecutor не працюватиме в інтерактивному інтерпретаторі.

Виклик методів Executor або Future із виклику, надісланого до ProcessPoolExecutor, призведе до взаємоблокування.

class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=(), max_tasks_per_child=None)

An Executor subclass that executes calls asynchronously using a pool of at most max_workers processes. If max_workers is None or not given, it will default to os.process_cpu_count(). If max_workers is less than or equal to 0, then a ValueError will be raised. On Windows, max_workers must be less than or equal to 61. If it is not then ValueError will be raised. If max_workers is None, then the default chosen will be at most 61, even if more processors are available. mp_context can be a multiprocessing context or None. It will be used to launch the workers. If mp_context is None or not given, the default multiprocessing context is used. See Контексти та методи запуску.

initializer — необов’язковий виклик, який викликається на початку кожного робочого процесу; initargs — це кортеж аргументів, що передається ініціалізатору. Якщо ініціалізатор викличе виняток, усі наразі незавершені завдання викличуть BrokenProcessPool, а також будь-яка спроба надіслати більше завдань до пулу.

max_tasks_per_child is an optional argument that specifies the maximum number of tasks a single process can execute before it will exit and be replaced with a fresh worker process. By default max_tasks_per_child is None which means worker processes will live as long as the pool. When a max is specified, the «spawn» multiprocessing start method will be used by default in absence of a mp_context parameter. This feature is incompatible with the «fork» start method.

Змінено в версії 3.3: When one of the worker processes terminates abruptly, a BrokenProcessPool error is now raised. Previously, behaviour was undefined but operations on the executor or its futures would often freeze or deadlock.

Змінено в версії 3.7: Аргумент mp_context додано, щоб дозволити користувачам керувати start_method для робочих процесів, створених пулом.

Додано аргументи initializer і initargs.

Примітка

The default multiprocessing start method (see Контексти та методи запуску) will change away from fork in Python 3.14. Code that requires fork be used for their ProcessPoolExecutor should explicitly specify that by passing a mp_context=multiprocessing.get_context("fork") parameter.

Змінено в версії 3.11: The max_tasks_per_child argument was added to allow users to control the lifetime of workers in the pool.

Змінено в версії 3.12: On POSIX systems, if your application has multiple threads and the multiprocessing context uses the "fork" start method: The os.fork() function called internally to spawn workers may raise a DeprecationWarning. Pass a mp_context configured to use a different start method. See the os.fork() documentation for further explanation.

Змінено в версії 3.13: max_workers uses os.process_cpu_count() by default, instead of os.cpu_count().

Приклад ProcessPoolExecutor

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

Об’єкти майбутнього

Клас Future інкапсулює асинхронне виконання викликаного. Future екземпляри створюються Executor.submit().

class concurrent.futures.Future

Інкапсулює асинхронне виконання викликаного. Future екземпляри створюються Executor.submit() і не повинні створюватися безпосередньо, за винятком тестування.

cancel()

Спроба скасувати виклик. Якщо виклик наразі виконується або завершується і його не можна скасувати, тоді метод поверне False, інакше виклик буде скасовано, а метод поверне True.

cancelled()

Повертає True, якщо виклик було успішно скасовано.

running()

Повертає True, якщо виклик зараз виконується і не може бути скасований.

done()

Повертає True, якщо виклик було успішно скасовано або завершено.

result(timeout=None)

Return the value returned by the call. If the call hasn’t yet completed then this method will wait up to timeout seconds. If the call hasn’t completed in timeout seconds, then a TimeoutError will be raised. timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time.

Якщо ф’ючерс скасовано до завершення, тоді буде викликано CancelledError.

Якщо виклик викликав виняток, цей метод викличе той самий виняток.

exception(timeout=None)

Return the exception raised by the call. If the call hasn’t yet completed then this method will wait up to timeout seconds. If the call hasn’t completed in timeout seconds, then a TimeoutError will be raised. timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time.

Якщо ф’ючерс скасовано до завершення, тоді буде викликано CancelledError.

Якщо виклик завершився без підняття, повертається None.

add_done_callback(fn)

Приєднує fn до майбутнього. fn буде викликано з майбутнім як єдиним аргументом, коли майбутнє скасовується або завершує роботу.

Додані виклики викликаються в тому порядку, в якому вони були додані, і завжди викликаються в потоці, що належить до процесу, який їх додав. Якщо виклик викликає підклас Exception, він буде зареєстрований і проігнорований. Якщо виклик викликає підклас BaseException, поведінка не визначена.

Якщо ф’ючерс уже завершено або скасовано, fn буде викликано негайно.

Наступні методи Future призначені для використання в модульних тестах і реалізаціях Executor.

set_running_or_notify_cancel()

Цей метод має викликатися лише реалізаціями Executor перед виконанням роботи, пов’язаної з Future і модульними тестами.

If the method returns False then the Future was cancelled, i.e. Future.cancel() was called and returned True. Any threads waiting on the Future completing (i.e. through as_completed() or wait()) will be woken up.

If the method returns True then the Future was not cancelled and has been put in the running state, i.e. calls to Future.running() will return True.

Цей метод можна викликати лише один раз і не можна викликати після виклику Future.set_result() або Future.set_exception().

set_result(result)

Встановлює для результату роботи, пов’язаної з Future значення result.

Цей метод має використовуватися лише реалізаціями Executor і модульними тестами.

Змінено в версії 3.8: Цей метод викликає concurrent.futures.InvalidStateError, якщо Future вже виконано.

set_exception(exception)

Встановлює для результату роботи, пов’язаної з Future значення Exception виняток.

Цей метод має використовуватися лише реалізаціями Executor і модульними тестами.

Змінено в версії 3.8: Цей метод викликає concurrent.futures.InvalidStateError, якщо Future вже виконано.

Функції модуля

concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)

Зачекайте, доки екземпляри Future (можливо, створені різними екземплярами Executor), надані fs, завершаться. Подвійні ф’ючерси, надані fs, видаляються та повертаються лише один раз. Повертає іменований 2-кортеж наборів. Перший набір, названий done, містить ф’ючерси, які завершилися (завершені або скасовані ф’ючерси) до завершення очікування. Другий набір під назвою not_done містить ф’ючерси, які не завершилися (ф’ючерси, що очікують або виконуються).

timeout можна використовувати для контролю максимальної кількості секунд очікування перед поверненням. timeout може бути int або float. Якщо timeout не вказано або None, час очікування не обмежений.

return_when вказує, коли ця функція має повернутися. Це має бути одна з таких констант:

Постійний

опис

concurrent.futures.FIRST_COMPLETED

Функція повернеться, коли будь-який майбутній завершиться або буде скасовано.

concurrent.futures.FIRST_EXCEPTION

The function will return when any future finishes by raising an exception. If no future raises an exception then it is equivalent to ALL_COMPLETED.

concurrent.futures.ALL_COMPLETED

Функція повернеться, коли всі ф’ючерси закінчаться або будуть скасовані.

concurrent.futures.as_completed(fs, timeout=None)

Returns an iterator over the Future instances (possibly created by different Executor instances) given by fs that yields futures as they complete (finished or cancelled futures). Any futures given by fs that are duplicated will be returned once. Any futures that completed before as_completed() is called will be yielded first. The returned iterator raises a TimeoutError if __next__() is called and the result isn’t available after timeout seconds from the original call to as_completed(). timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time.

Дивись також

PEP 3148 – ф’ючерси - виконувати обчислення асинхронно

Пропозиція, яка описує цю функцію для включення в стандартну бібліотеку Python.

Класи винятків

exception concurrent.futures.CancelledError

Піднімається, коли ф’ючерс скасовується.

exception concurrent.futures.TimeoutError

A deprecated alias of TimeoutError, raised when a future operation exceeds the given timeout.

Змінено в версії 3.11: Цей клас отримав псевдонім TimeoutError.

exception concurrent.futures.BrokenExecutor

Похідний від RuntimeError, цей клас винятків виникає, коли виконавець з певної причини не працює, і його не можна використовувати для надсилання або виконання нових завдань.

Added in version 3.7.

exception concurrent.futures.InvalidStateError

Викликається, коли над майбутнім виконується операція, яка не дозволена в поточному стані.

Added in version 3.8.

exception concurrent.futures.thread.BrokenThreadPool

Derived from BrokenExecutor, this exception class is raised when one of the workers of a ThreadPoolExecutor has failed initializing.

Added in version 3.7.

exception concurrent.futures.process.BrokenProcessPool

Derived from BrokenExecutor (formerly RuntimeError), this exception class is raised when one of the workers of a ProcessPoolExecutor has terminated in a non-clean fashion (for example, if it was killed from the outside).

Added in version 3.3.