concurrent.futures
— Lanzamiento de tareas paralelas¶
Nuevo en la versión 3.2.
Código fuente: Lib/concurrent/futures/thread.py y Lib/concurrent/futures/process.py
El módulo concurrent.futures
provee una interfaz de alto nivel para ejecutar invocables de forma asincrónica.
La ejecución asincrónica se puede realizar mediante hilos, usando ThreadPoolExecutor
, o procesos independientes, mediante ProcessPoolExecutor
. Ambos implementan la misma interfaz, que se encuentra definida por la clase abstracta Executor
.
Availability: not Emscripten, not WASI.
This module does not work or is not available on WebAssembly platforms
wasm32-emscripten
and wasm32-wasi
. See
Plataformas WebAssembly for more information.
Objetos ejecutores¶
- class concurrent.futures.Executor¶
Una clase abstracta que proporciona métodos para ejecutar llamadas de forma asincrónica. No debe ser utilizada directamente, sino a través de sus subclases.
- submit(fn, /, *args, **kwargs)¶
Schedules the callable, fn, to be executed as
fn(*args, **kwargs)
and returns aFuture
object representing the execution of the callable.with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(pow, 323, 1235) print(future.result())
- map(fn, *iterables, timeout=None, chunksize=1)¶
Similar to
map(fn, *iterables)
except:los iterables son recolectados inmediatamente, en lugar de perezosamente;
fn is executed asynchronously and several calls to fn may be made concurrently.
The returned iterator raises a
TimeoutError
if__next__()
is called and the result isn’t available after timeout seconds from the original call toExecutor.map()
. timeout can be an int or a float. If timeout is not specified orNone
, there is no limit to the wait time.If a fn call raises an exception, then that exception will be raised when its value is retrieved from the iterator.
Al usar
ProcessPoolExecutor
, este método divide los iterables en varios fragmentos que luego envía al grupo como tareas separadas. El tamaño (aproximado) de estos fragmentos puede especificarse estableciendo chunksize a un entero positivo. El uso de un valor grande para chunksize puede mejorar significativamente el rendimiento en comparación con el tamaño predeterminado de 1. ConThreadPoolExecutor
, el chunksize no tiene ningún efecto.Distinto en la versión 3.5: Se agregó el argumento chunksize.
- shutdown(wait=True, *, cancel_futures=False)¶
Indica al ejecutor que debe liberar todos los recursos que está utilizando cuando los futuros actualmente pendientes de ejecución finalicen. Las llamadas a
Executor.submit()
yExecutor.map()
realizadas después del apagado lanzaránRuntimeError
.Si wait es
True
este método no retornará hasta que todos los futuros pendientes hayan terminado su ejecución y los recursos asociados al ejecutor hayan sido liberados. Si wait esFalse
, este método retornará de inmediato y los recursos asociados al ejecutor se liberarán cuando todos los futuros asociados hayan finalizado su ejecución. Independientemente del valor de wait, el programa Python entero no finalizará hasta que todos los futuros pendientes hayan finalizado su ejecución.Si cancel_futures es
True
, este método cancelará todos los futuros pendientes que el ejecutor no ha comenzado a ejecutar. Los futuros que se completen o estén en ejecución no se cancelarán, independientemente del valor de cancel_futures.Si tanto cancel_futures como wait son
True
, todos los futuros que el ejecutor ha comenzado a ejecutar se completarán antes de que regrese este método. Los futuros restantes se cancelan.Se puede evitar tener que llamar este método explícitamente si se usa la sentencia
with
, la cual apagará elExecutor
(esperando como siExecutor.shutdown()
hubiera sido llamado con wait enTrue
):import shutil with ThreadPoolExecutor(max_workers=4) as e: e.submit(shutil.copy, 'src1.txt', 'dest1.txt') e.submit(shutil.copy, 'src2.txt', 'dest2.txt') e.submit(shutil.copy, 'src3.txt', 'dest3.txt') e.submit(shutil.copy, 'src4.txt', 'dest4.txt')
Distinto en la versión 3.9: Agregado cancel_futures.
ThreadPoolExecutor¶
ThreadPoolExecutor
es una subclase de Executor
que usa un grupo de hilos para ejecutar llamadas de forma asincrónica.
Pueden ocurrir bloqueos mutuos cuando la llamada asociada a un Future
espera el resultado de otro Future
. Por ejemplo:
import time
def wait_on_b():
time.sleep(5)
print(b.result()) # b will never complete because it is waiting on a.
return 5
def wait_on_a():
time.sleep(5)
print(a.result()) # a will never complete because it is waiting on b.
return 6
executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)
Y:
def wait_on_future():
f = executor.submit(pow, 5, 2)
# This will never complete because there is only one worker thread and
# it is executing this function.
print(f.result())
executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)
- class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())¶
Subclase de
Executor
que utiliza un grupo de hilos de max_workers como máximo para ejecutar llamadas de forma asincrónica.Todos los hilos que se hayan puesto en cola en
ThreadPoolExecutor
se unirán antes de que el intérprete pueda salir. Tenga en cuenta que el controlador de salida que hace esto se ejecuta antes de cualquier controlador de salida añadido medianteatexit
. Esto significa que las excepciones en el hilo principal deben ser capturadas y manejadas para indicar a los hilos que salgan correctamente. Por esta razón, se recomienda no utilizarThreadPoolExecutor
para tareas de larga duración.initializer es un invocable opcional que es llamado al comienzo de cada hilo de trabajo; initargs es una tupla de argumentos pasados al inicializador. Si el initializer lanza una excepción, todos los trabajos actualmente pendientes lanzarán
BrokenThreadPool
, así como cualquier intento de enviar más trabajos al grupo.Distinto en la versión 3.5: Si max_workers es
None
o no es especificado, se tomará por defecto el número de procesadores de la máquina, multiplicado por5
, asumiendo queThreadPoolExecutor
a menudo se utiliza para paralelizar E/S en lugar de trabajo de CPU y que el numero de trabajadores debe ser mayor que el número de trabajadores paraProcessPoolExecutor
.Distinto en la versión 3.6: Added the thread_name_prefix parameter to allow users to control the
threading.Thread
names for worker threads created by the pool for easier debugging.Distinto en la versión 3.7: Se agregaron los argumentos initializer y initargs.
Distinto en la versión 3.8: El valor predeterminado de max_workers fue reemplazado por
min(32, os.cpu_count() + 4)
. Este valor predeterminado conserva al menos 5 trabajadores para las tareas vinculadas de E/S. Utiliza como máximo 32 núcleos de CPU para tareas vinculadas a la CPU que liberan el GIL. Y evita utilizar recursos muy grandes implícitamente en máquinas de muchos núcleos.ThreadPoolExecutor ahora también reutiliza hilos inactivos antes de crear max_workers hilos de trabajo.
Ejemplo de ThreadPoolExecutor¶
import concurrent.futures
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://nonexistant-subdomain.python.org/']
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
ProcessPoolExecutor¶
La clase ProcessPoolExecutor
es una subclase Executor
que usa un grupo de procesos para ejecutar llamadas de forma asíncrona. ProcessPoolExecutor
usa el módulo multiprocessing
, que le permite eludir el Global Interpreter Lock pero también significa que solo los objetos seleccionables pueden ser ejecutados y retornados.
El módulo __main__
debe ser importable por los subprocesos trabajadores. Esto significa que ProcessPoolExecutor
no funcionará en el intérprete interactivo.
Llamar a métodos de Executor
o Future
desde el invocable enviado a ProcessPoolExecutor
resultará en bloqueos mutuos.
- class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=(), max_tasks_per_child=None)¶
Subclase de
Executor
que ejecuta llamadas asincrónicas mediante un grupo de, como máximo, max_workers procesos. Si max_workers esNone
o no fue especificado, el número predeterminado será la cantidad de procesadores de la máquina, Si max_workers es menor o igual a0
, la excepciónValueError
será lanzada. En Windows, max_workers debe ser menor o igual a61
. Si no es así, la excepciónValueError
será lanzada. Si max_workers esNone
, el número predeterminado será61
como máximo, aún si existen más procesadores disponibles. mp_context puede ser un contexto de multiprocesamiento oNone
y será utilizado para iniciar los trabajadores. Si mp_context esNone
o no es especificado, se utilizará el contexto predeterminado de multiprocesamiento.initializer es un invocable opcional que es llamado al comienzo de cada proceso trabajador; initargs es una tupla de argumentos pasados al inicializador. Si el initializer lanza una excepción, todos los trabajos actualmente pendientes lanzarán
BrokenProcessPool
, así como cualquier intento de enviar más trabajos al grupo.max_tasks_per_child es un argumento opcional que especifica el número máximo de tareas que un único proceso puede ejecutar antes de salir y ser reemplazado por un nuevo proceso de trabajo. Por defecto, max_tasks_per_child es
None
, lo que significa que los procesos de trabajo vivirán tanto como el pool. Cuando se especifica un máximo, el método de inicio de multiprocesamiento «spawn» se utilizará por defecto en ausencia de un parámetro mp_context. Esta característica es incompatible con el método de inicio «fork».Distinto en la versión 3.3: When one of the worker processes terminates abruptly, a
BrokenProcessPool
error is now raised. Previously, behaviour was undefined but operations on the executor or its futures would often freeze or deadlock.Distinto en la versión 3.7: El argumento mp_context se agregó para permitir a los usuarios controlar el método de iniciación para procesos de trabajo creados en el grupo.
Se agregaron los argumentos initializer y initargs.
Distinto en la versión 3.11: The max_tasks_per_child argument was added to allow users to control the lifetime of workers in the pool.
Ejemplo de ProcessPoolExecutor¶
import concurrent.futures
import math
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]
def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
if __name__ == '__main__':
main()
Objetos futuro¶
La clase Future
encapsula la ejecución asincrónica del invocable. Las instancias de Future
son creadas por Executor.submit()
.
- class concurrent.futures.Future¶
Encapsula la ejecución asincrónica del invocable. Las instancias de
Future
son creadas porExecutor.submit()
y no deberían ser creadas directamente, excepto para pruebas.- cancel()¶
Intenta cancelar la llamada. Si el invocable está siendo ejecutado o ha finalizado su ejecución y no puede ser cancelado el método retornará
False
, de lo contrario la llamada será cancelada y el método retornaráTrue
.
- cancelled()¶
Retorna
True
si la llamada fue cancelada exitosamente.
- running()¶
Retorna
True
si la llamada está siendo ejecutada y no puede ser cancelada.
- done()¶
Retorna
True
si la llamada fue cancelada exitosamente o terminó su ejecución.
- result(timeout=None)¶
Return the value returned by the call. If the call hasn’t yet completed then this method will wait up to timeout seconds. If the call hasn’t completed in timeout seconds, then a
TimeoutError
will be raised. timeout can be an int or float. If timeout is not specified orNone
, there is no limit to the wait time.Si el futuro es cancelado antes de finalizar su ejecución,
CancelledError
será lanzada.Si la llamada lanzó una excepción, este método lanzará la misma excepción.
- exception(timeout=None)¶
Return the exception raised by the call. If the call hasn’t yet completed then this method will wait up to timeout seconds. If the call hasn’t completed in timeout seconds, then a
TimeoutError
will be raised. timeout can be an int or float. If timeout is not specified orNone
, there is no limit to the wait time.Si el futuro es cancelado antes de finalizar su ejecución,
CancelledError
será lanzada.Si la llamada es completada sin excepciones, se retornará
`None
.
- add_done_callback(fn)¶
Asocia el invocable fn al futuro. fn va a ser llamada, con el futuro como su único argumento, cuando el futuro sea cancelado o finalice su ejecución.
Los invocables agregados se llaman en el orden en que se agregaron y siempre se llaman en un hilo que pertenece al proceso que los agregó. Si el invocable genera una subclase
Exception
, se registrará y se ignorará. Si el invocable genera una subclaseBaseException
, el comportamiento no está definido.Si el futuro ya ha finalizado su ejecución o fue cancelado, fn retornará inmediatamente.
Los siguientes métodos de
Future
están pensados para ser usados en pruebas unitarias e implementaciones deExecutor
.- set_running_or_notify_cancel()¶
Este método sólo debe ser llamado en implementaciones de
Executor
antes de ejecutar el trabajo asociado alFuture
y por las pruebas unitarias.If the method returns
False
then theFuture
was cancelled, i.e.Future.cancel()
was called and returnedTrue
. Any threads waiting on theFuture
completing (i.e. throughas_completed()
orwait()
) will be woken up.If the method returns
True
then theFuture
was not cancelled and has been put in the running state, i.e. calls toFuture.running()
will returnTrue
.Este método solo puede ser llamado una sola vez y no puede ser llamado luego de haber llamado a
Future.set_result()
o aFuture.set_exception()
.
- set_result(result)¶
Establece result como el resultado del trabajo asociado al
Future
.Este método solo debe ser usado por implementaciones de
Executor
y pruebas unitarias.Distinto en la versión 3.8: Este método lanza
concurrent.futures.InvalidStateError
siFuture
ya ha finalizado su ejecución.
- set_exception(exception)¶
Establece exception, subclase de
Exception
, como el resultado del trabajo asociado alFuture
.Este método solo debe ser usado por implementaciones de
Executor
y pruebas unitarias.Distinto en la versión 3.8: Este método lanza
concurrent.futures.InvalidStateError
siFuture
ya ha finalizado su ejecución.
Funciones del módulo¶
- concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)¶
Wait for the
Future
instances (possibly created by differentExecutor
instances) given by fs to complete. Duplicate futures given to fs are removed and will be returned only once. Returns a named 2-tuple of sets. The first set, nameddone
, contains the futures that completed (finished or cancelled futures) before the wait completed. The second set, namednot_done
, contains the futures that did not complete (pending or running futures).El argumento timeout puede ser usado para controlar la espera máxima en segundos antes de retornar. timeout puede ser un int o un float. Si timeout no es especificado o es
None
, no hay limite en el tiempo de espera.return_when indica cuando debe retornar esta función. Debe ser alguna de las siguientes constantes:
Constante
Descripción
- concurrent.futures.FIRST_COMPLETED¶
La función retornará cuando cualquier futuro finalice o sea cancelado.
- concurrent.futures.FIRST_EXCEPTION¶
The function will return when any future finishes by raising an exception. If no future raises an exception then it is equivalent to
ALL_COMPLETED
.- concurrent.futures.ALL_COMPLETED¶
La función retornará cuando todos los futuros finalicen o sean cancelados.
- concurrent.futures.as_completed(fs, timeout=None)¶
Returns an iterator over the
Future
instances (possibly created by differentExecutor
instances) given by fs that yields futures as they complete (finished or cancelled futures). Any futures given by fs that are duplicated will be returned once. Any futures that completed beforeas_completed()
is called will be yielded first. The returned iterator raises aTimeoutError
if__next__()
is called and the result isn’t available after timeout seconds from the original call toas_completed()
. timeout can be an int or float. If timeout is not specified orNone
, there is no limit to the wait time.
Ver también
- PEP 3148 – futuros - ejecutar cómputos asincrónicamente
La propuesta que describe esta propuesta de inclusión en la biblioteca estándar de Python.
Clases de Excepciones¶
- exception concurrent.futures.CancelledError¶
Lanzada cuando un futuro es cancelado.
- exception concurrent.futures.TimeoutError¶
A deprecated alias of
TimeoutError
, raised when a future operation exceeds the given timeout.Distinto en la versión 3.11: Esta clase se convirtió en un alias de
TimeoutError
.
- exception concurrent.futures.BrokenExecutor¶
Derivada de
RuntimeError
, esta excepción es lanzada cuando un ejecutor se encuentra corrupto por algún motivo y no puede ser utilizado para enviar o ejecutar nuevas tareas.Nuevo en la versión 3.7.
- exception concurrent.futures.InvalidStateError¶
Lanzada cuando una operación es realizada sobre un futuro que no permite dicha operación en el estado actual.
Nuevo en la versión 3.8.
- exception concurrent.futures.thread.BrokenThreadPool¶
Derived from
BrokenExecutor
, this exception class is raised when one of the workers of aThreadPoolExecutor
has failed initializing.Nuevo en la versión 3.7.
- exception concurrent.futures.process.BrokenProcessPool¶
Derived from
BrokenExecutor
(formerlyRuntimeError
), this exception class is raised when one of the workers of aProcessPoolExecutor
has terminated in a non-clean fashion (for example, if it was killed from the outside).Nuevo en la versión 3.3.