concurrent.futures
— Launching parallel tasks¶
Added in version 3.2.
Вихідний код: Lib/concurrent/futures/thread.py і Lib/concurrent/futures/process.py
Модуль concurrent.futures
забезпечує інтерфейс високого рівня для асинхронного виконання викликів.
Асинхронне виконання може виконуватися потоками, використовуючи ThreadPoolExecutor
, або окремими процесами, використовуючи ProcessPoolExecutor
. Обидва реалізують однаковий інтерфейс, який визначається абстрактним класом Executor
.
Availability: not WASI.
This module does not work or is not available on WebAssembly. See WebAssembly platforms for more information.
Об’єкти виконавця¶
- class concurrent.futures.Executor¶
Абстрактний клас, який надає методи для асинхронного виконання викликів. Його слід використовувати не безпосередньо, а через його конкретні підкласи.
- submit(fn, /, *args, **kwargs)¶
Планує виконання викликаного, fn, як
fn(*args, **kwargs)
і повертає об’єктFuture
, який представляє виконання викликаного.with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(pow, 323, 1235) print(future.result())
- map(fn, *iterables, timeout=None, chunksize=1)¶
Similar to
map(fn, *iterables)
except:iterables збираються негайно, а не ліниво;
fn is executed asynchronously and several calls to fn may be made concurrently.
The returned iterator raises a
TimeoutError
if__next__()
is called and the result isn’t available after timeout seconds from the original call toExecutor.map()
. timeout can be an int or a float. If timeout is not specified orNone
, there is no limit to the wait time.If a fn call raises an exception, then that exception will be raised when its value is retrieved from the iterator.
Під час використання
ProcessPoolExecutor
цей метод розбиває iterables на кілька фрагментів, які надсилає до пулу як окремі завдання. (Приблизний) розмір цих фрагментів можна вказати, встановивши для chunksize додатне ціле число. Для дуже довгих ітерацій використання великого значення chunksize може значно підвищити продуктивність порівняно з розміром за замовчуванням 1. ЗThreadPoolExecutor
chunksize не впливає.Змінено в версії 3.5: Додано аргумент chunksize.
- shutdown(wait=True, *, cancel_futures=False)¶
Дайте сигнал виконавцю, що він повинен звільнити будь-які ресурси, які він використовує, коли завершено виконання поточних ф’ючерсів. Виклики
Executor.submit()
іExecutor.map()
, здійснені після вимкнення, викличутьRuntimeError
.Якщо wait має значення
True
, тоді цей метод не повернеться, доки не буде завершено виконання всіх очікуваних ф’ючерсів і не буде звільнено ресурси, пов’язані з виконавцем. Якщо wait має значенняFalse
, тоді цей метод повернеться негайно, а ресурси, пов’язані з виконавцем, будуть звільнені, коли завершиться виконання всіх очікуваних ф’ючерсів. Незалежно від значення wait, уся програма Python не завершить роботу, доки не буде завершено виконання всіх незавершених ф’ючерсів.Якщо cancel_futures має значення
True
, цей метод скасує всі незавершені ф’ючерси, які виконавець ще не запускав. Будь-які завершені або запущені ф’ючерси не будуть скасовані, незалежно від значення cancel_futures.Якщо і cancel_futures, і wait мають значення
True
, усі ф’ючерси, які почав виконувати виконавець, будуть завершені до повернення цього методу. Решта ф’ючерсів скасовано.Ви можете уникнути необхідності явного виклику цього методу, якщо використаєте інструкцію
with
, яка вимкнеExecutor
(очікуючи так, нібиExecutor.shutdown()
викликається з установленим wait доПравда
):import shutil with ThreadPoolExecutor(max_workers=4) as e: e.submit(shutil.copy, 'src1.txt', 'dest1.txt') e.submit(shutil.copy, 'src2.txt', 'dest2.txt') e.submit(shutil.copy, 'src3.txt', 'dest3.txt') e.submit(shutil.copy, 'src4.txt', 'dest4.txt')
Змінено в версії 3.9: Додано cancel_futures.
ThreadPoolExecutor¶
ThreadPoolExecutor
— це підклас Executor
, який використовує пул потоків для асинхронного виконання викликів.
Взаємоблокування можуть виникати, коли виклик, пов’язаний із Future
, чекає результатів іншого Future
. Наприклад:
import time
def wait_on_b():
time.sleep(5)
print(b.result()) # b will never complete because it is waiting on a.
return 5
def wait_on_a():
time.sleep(5)
print(a.result()) # a will never complete because it is waiting on b.
return 6
executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)
І:
def wait_on_future():
f = executor.submit(pow, 5, 2)
# This will never complete because there is only one worker thread and
# it is executing this function.
print(f.result())
executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)
- class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())¶
Підклас
Executor
, який використовує пул щонайбільше max_workers потоків для асинхронного виконання викликів.All threads enqueued to
ThreadPoolExecutor
will be joined before the interpreter can exit. Note that the exit handler which does this is executed before any exit handlers added usingatexit
. This means exceptions in the main thread must be caught and handled in order to signal threads to exit gracefully. For this reason, it is recommended thatThreadPoolExecutor
not be used for long-running tasks.initializer — необов’язковий виклик, який викликається на початку кожного робочого потоку; initargs — це кортеж аргументів, що передається ініціалізатору. Якщо ініціалізатор викликає виняток, усі наразі незавершені завдання викличуть
BrokenThreadPool
, а також будь-яка спроба відправити більше завдань до пулу.Змінено в версії 3.5: Якщо max_workers має значення
None
або не вказано, за замовчуванням буде використовуватися кількість процесорів на машині, помножена на5
, припускаючи, щоThreadPoolExecutor
часто використовується для перекриття вводу-виводу замість роботи центрального процесора, а кількість робітників має бути більшою за кількість робітників дляProcessPoolExecutor
.Змінено в версії 3.6: Added the thread_name_prefix parameter to allow users to control the
threading.Thread
names for worker threads created by the pool for easier debugging.Змінено в версії 3.7: Додано аргументи initializer і initargs.
Змінено в версії 3.8: Значення за замовчуванням max_workers змінено на
min(32, os.cpu_count() + 4)
. Це значення за замовчуванням зберігає принаймні 5 працівників для завдань, пов’язаних із вводом-виводом. Він використовує щонайбільше 32 ядра ЦП для пов’язаних із ЦП завдань, які випускають GIL. І це дозволяє уникнути використання дуже великих ресурсів неявно на багатоядерних машинах.ThreadPoolExecutor тепер повторно використовує неактивні робочі потоки перед запуском робочих потоків max_workers.
Змінено в версії 3.13: Default value of max_workers is changed to
min(32, (os.process_cpu_count() or 1) + 4)
.
Приклад ThreadPoolExecutor¶
import concurrent.futures
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://nonexistent-subdomain.python.org/']
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
ProcessPoolExecutor¶
Клас ProcessPoolExecutor
є підкласом Executor
, який використовує пул процесів для асинхронного виконання викликів. ProcessPoolExecutor
використовує модуль multiprocessing
, який дозволяє йому обходити Global Interpreter Lock, але також означає, що можна виконувати та повертати лише об’єкти, які можна вибрати.
Модуль __main__
має бути імпортованим робочими підпроцесами. Це означає, що ProcessPoolExecutor
не працюватиме в інтерактивному інтерпретаторі.
Виклик методів Executor
або Future
із виклику, надісланого до ProcessPoolExecutor
, призведе до взаємоблокування.
- class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=(), max_tasks_per_child=None)¶
An
Executor
subclass that executes calls asynchronously using a pool of at most max_workers processes. If max_workers isNone
or not given, it will default toos.process_cpu_count()
. If max_workers is less than or equal to0
, then aValueError
will be raised. On Windows, max_workers must be less than or equal to61
. If it is not thenValueError
will be raised. If max_workers isNone
, then the default chosen will be at most61
, even if more processors are available. mp_context can be amultiprocessing
context orNone
. It will be used to launch the workers. If mp_context isNone
or not given, the defaultmultiprocessing
context is used. See Контексти та методи запуску.initializer — необов’язковий виклик, який викликається на початку кожного робочого процесу; initargs — це кортеж аргументів, що передається ініціалізатору. Якщо ініціалізатор викличе виняток, усі наразі незавершені завдання викличуть
BrokenProcessPool
, а також будь-яка спроба надіслати більше завдань до пулу.max_tasks_per_child is an optional argument that specifies the maximum number of tasks a single process can execute before it will exit and be replaced with a fresh worker process. By default max_tasks_per_child is
None
which means worker processes will live as long as the pool. When a max is specified, the «spawn» multiprocessing start method will be used by default in absence of a mp_context parameter. This feature is incompatible with the «fork» start method.Змінено в версії 3.3: When one of the worker processes terminates abruptly, a
BrokenProcessPool
error is now raised. Previously, behaviour was undefined but operations on the executor or its futures would often freeze or deadlock.Змінено в версії 3.7: Аргумент mp_context додано, щоб дозволити користувачам керувати start_method для робочих процесів, створених пулом.
Додано аргументи initializer і initargs.
Примітка
The default
multiprocessing
start method (see Контексти та методи запуску) will change away from fork in Python 3.14. Code that requires fork be used for theirProcessPoolExecutor
should explicitly specify that by passing amp_context=multiprocessing.get_context("fork")
parameter.Змінено в версії 3.11: The max_tasks_per_child argument was added to allow users to control the lifetime of workers in the pool.
Змінено в версії 3.12: On POSIX systems, if your application has multiple threads and the
multiprocessing
context uses the"fork"
start method: Theos.fork()
function called internally to spawn workers may raise aDeprecationWarning
. Pass a mp_context configured to use a different start method. See theos.fork()
documentation for further explanation.Змінено в версії 3.13: max_workers uses
os.process_cpu_count()
by default, instead ofos.cpu_count()
.
Приклад ProcessPoolExecutor¶
import concurrent.futures
import math
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]
def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
if __name__ == '__main__':
main()
Об’єкти майбутнього¶
Клас Future
інкапсулює асинхронне виконання викликаного. Future
екземпляри створюються Executor.submit()
.
- class concurrent.futures.Future¶
Інкапсулює асинхронне виконання викликаного.
Future
екземпляри створюютьсяExecutor.submit()
і не повинні створюватися безпосередньо, за винятком тестування.- cancel()¶
Спроба скасувати виклик. Якщо виклик наразі виконується або завершується і його не можна скасувати, тоді метод поверне
False
, інакше виклик буде скасовано, а метод повернеTrue
.
- cancelled()¶
Повертає
True
, якщо виклик було успішно скасовано.
- running()¶
Повертає
True
, якщо виклик зараз виконується і не може бути скасований.
- done()¶
Повертає
True
, якщо виклик було успішно скасовано або завершено.
- result(timeout=None)¶
Return the value returned by the call. If the call hasn’t yet completed then this method will wait up to timeout seconds. If the call hasn’t completed in timeout seconds, then a
TimeoutError
will be raised. timeout can be an int or float. If timeout is not specified orNone
, there is no limit to the wait time.Якщо ф’ючерс скасовано до завершення, тоді буде викликано
CancelledError
.Якщо виклик викликав виняток, цей метод викличе той самий виняток.
- exception(timeout=None)¶
Return the exception raised by the call. If the call hasn’t yet completed then this method will wait up to timeout seconds. If the call hasn’t completed in timeout seconds, then a
TimeoutError
will be raised. timeout can be an int or float. If timeout is not specified orNone
, there is no limit to the wait time.Якщо ф’ючерс скасовано до завершення, тоді буде викликано
CancelledError
.Якщо виклик завершився без підняття, повертається
None
.
- add_done_callback(fn)¶
Приєднує fn до майбутнього. fn буде викликано з майбутнім як єдиним аргументом, коли майбутнє скасовується або завершує роботу.
Додані виклики викликаються в тому порядку, в якому вони були додані, і завжди викликаються в потоці, що належить до процесу, який їх додав. Якщо виклик викликає підклас
Exception
, він буде зареєстрований і проігнорований. Якщо виклик викликає підкласBaseException
, поведінка не визначена.Якщо ф’ючерс уже завершено або скасовано, fn буде викликано негайно.
Наступні методи
Future
призначені для використання в модульних тестах і реалізаціяхExecutor
.- set_running_or_notify_cancel()¶
Цей метод має викликатися лише реалізаціями
Executor
перед виконанням роботи, пов’язаної зFuture
і модульними тестами.If the method returns
False
then theFuture
was cancelled, i.e.Future.cancel()
was called and returnedTrue
. Any threads waiting on theFuture
completing (i.e. throughas_completed()
orwait()
) will be woken up.If the method returns
True
then theFuture
was not cancelled and has been put in the running state, i.e. calls toFuture.running()
will returnTrue
.Цей метод можна викликати лише один раз і не можна викликати після виклику
Future.set_result()
абоFuture.set_exception()
.
- set_result(result)¶
Встановлює для результату роботи, пов’язаної з
Future
значення result.Цей метод має використовуватися лише реалізаціями
Executor
і модульними тестами.Змінено в версії 3.8: Цей метод викликає
concurrent.futures.InvalidStateError
, якщоFuture
вже виконано.
Функції модуля¶
- concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)¶
Зачекайте, доки екземпляри
Future
(можливо, створені різними екземплярамиExecutor
), надані fs, завершаться. Подвійні ф’ючерси, надані fs, видаляються та повертаються лише один раз. Повертає іменований 2-кортеж наборів. Перший набір, названийdone
, містить ф’ючерси, які завершилися (завершені або скасовані ф’ючерси) до завершення очікування. Другий набір під назвоюnot_done
містить ф’ючерси, які не завершилися (ф’ючерси, що очікують або виконуються).timeout можна використовувати для контролю максимальної кількості секунд очікування перед поверненням. timeout може бути int або float. Якщо timeout не вказано або
None
, час очікування не обмежений.return_when вказує, коли ця функція має повернутися. Це має бути одна з таких констант:
Постійний
опис
- concurrent.futures.FIRST_COMPLETED¶
Функція повернеться, коли будь-який майбутній завершиться або буде скасовано.
- concurrent.futures.FIRST_EXCEPTION¶
The function will return when any future finishes by raising an exception. If no future raises an exception then it is equivalent to
ALL_COMPLETED
.- concurrent.futures.ALL_COMPLETED¶
Функція повернеться, коли всі ф’ючерси закінчаться або будуть скасовані.
- concurrent.futures.as_completed(fs, timeout=None)¶
Returns an iterator over the
Future
instances (possibly created by differentExecutor
instances) given by fs that yields futures as they complete (finished or cancelled futures). Any futures given by fs that are duplicated will be returned once. Any futures that completed beforeas_completed()
is called will be yielded first. The returned iterator raises aTimeoutError
if__next__()
is called and the result isn’t available after timeout seconds from the original call toas_completed()
. timeout can be an int or float. If timeout is not specified orNone
, there is no limit to the wait time.
Дивись також
- PEP 3148 – ф’ючерси - виконувати обчислення асинхронно
Пропозиція, яка описує цю функцію для включення в стандартну бібліотеку Python.
Класи винятків¶
- exception concurrent.futures.CancelledError¶
Піднімається, коли ф’ючерс скасовується.
- exception concurrent.futures.TimeoutError¶
A deprecated alias of
TimeoutError
, raised when a future operation exceeds the given timeout.Змінено в версії 3.11: Цей клас отримав псевдонім
TimeoutError
.
- exception concurrent.futures.BrokenExecutor¶
Похідний від
RuntimeError
, цей клас винятків виникає, коли виконавець з певної причини не працює, і його не можна використовувати для надсилання або виконання нових завдань.Added in version 3.7.
- exception concurrent.futures.InvalidStateError¶
Викликається, коли над майбутнім виконується операція, яка не дозволена в поточному стані.
Added in version 3.8.
- exception concurrent.futures.thread.BrokenThreadPool¶
Derived from
BrokenExecutor
, this exception class is raised when one of the workers of aThreadPoolExecutor
has failed initializing.Added in version 3.7.
- exception concurrent.futures.process.BrokenProcessPool¶
Derived from
BrokenExecutor
(formerlyRuntimeError
), this exception class is raised when one of the workers of aProcessPoolExecutor
has terminated in a non-clean fashion (for example, if it was killed from the outside).Added in version 3.3.