concurrent.futures
— Lanzamiento de tareas paralelas¶
Nuevo en la versión 3.2.
Código fuente: Lib/concurrent/futures/thread.py y Lib/concurrent/futures/process.py
El módulo concurrent.futures
provee una interfaz de alto nivel para ejecutar invocables de forma asincrónica.
La ejecución asincrónica se puede realizar mediante hilos, usando ThreadPoolExecutor
, o procesos independientes, mediante ProcessPoolExecutor
. Ambos implementan la misma interfaz, que se encuentra definida por la clase abstracta Executor
.
Objetos Ejecutores¶
-
class
concurrent.futures.
Executor
¶ Una clase abstracta que proporciona métodos para ejecutar llamadas de forma asincrónica. No debe ser utilizada directamente, sino a través de sus subclases.
-
submit
(fn, *args, **kwargs)¶ Programa la invocación de fn, que será ejecutada como
fn(*args **kwargs)
y retorna un objetoFuture
que representa la ejecución del invocable.with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(pow, 323, 1235) print(future.result())
-
map
(func, *iterables, timeout=None, chunksize=1)¶ Similar a
map(func, *iterables)
excepto que:los iterables son recolectados inmediatamente, en lugar de perezosamente;
func se ejecuta de forma asincrónica y se pueden realizar varias llamadas a func simultáneamente.
El iterador retornado lanza
concurrent.futures.TimeoutError
si__next__()
es llamado y el resultado no está disponible luego de timeout segundos luego de la llamada original aExecutor.map()
. timeout puede ser un int o un float. Si no se provee un timeout o esNone
, no hay limite de espera.Si una llamada a func lanza una excepción, dicha excepción va a ser lanzada cuando su valor sea retornado por el iterador.
When using
ProcessPoolExecutor
, this method chops iterables into a number of chunks which it submits to the pool as separate tasks. The (approximate) size of these chunks can be specified by setting chunksize to a positive integer. For very long iterables, using a large value for chunksize can significantly improve performance compared to the default size of 1. WithThreadPoolExecutor
, chunksize has no effect.Distinto en la versión 3.5: Se agregó el argumento chunksize.
-
shutdown
(wait=True)¶ Indica al ejecutor que debe liberar todos los recursos que está utilizando cuando los futuros actualmente pendientes de ejecución finalicen. Las llamadas a
Executor.submit()
yExecutor.map()
realizadas después del apagado lanzaránRuntimeError
.Si wait es
True
este método no retornará hasta que todos los futuros pendientes hayan terminado su ejecución y los recursos asociados al ejecutor hayan sido liberados. Si wait esFalse
, este método retornará de inmediato y los recursos asociados al ejecutor se liberarán cuando todos los futuros asociados hayan finalizado su ejecución. Independientemente del valor de wait, el programa Python entero no finalizará hasta que todos los futuros pendientes hayan finalizado su ejecución.You can avoid having to call this method explicitly if you use the
with
statement, which will shutdown theExecutor
(waiting as ifExecutor.shutdown()
were called with wait set toTrue
):import shutil with ThreadPoolExecutor(max_workers=4) as e: e.submit(shutil.copy, 'src1.txt', 'dest1.txt') e.submit(shutil.copy, 'src2.txt', 'dest2.txt') e.submit(shutil.copy, 'src3.txt', 'dest3.txt') e.submit(shutil.copy, 'src4.txt', 'dest4.txt')
-
ThreadPoolExecutor¶
ThreadPoolExecutor
es una subclase de Executor
que usa un grupo de hilos para ejecutar llamadas de forma asincrónica.
Pueden ocurrir bloqueos mutuos cuando la llamada asociada a un Future
espera el resultado de otro Future
. Por ejemplo:
import time
def wait_on_b():
time.sleep(5)
print(b.result()) # b will never complete because it is waiting on a.
return 5
def wait_on_a():
time.sleep(5)
print(a.result()) # a will never complete because it is waiting on b.
return 6
executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)
Y:
def wait_on_future():
f = executor.submit(pow, 5, 2)
# This will never complete because there is only one worker thread and
# it is executing this function.
print(f.result())
executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)
-
class
concurrent.futures.
ThreadPoolExecutor
(max_workers=None, thread_name_prefix='', initializer=None, initargs=())¶ Subclase de
Executor
que utiliza un grupo de hilos de max_workers como máximo para ejecutar llamadas de forma asincrónica.initializer es un invocable opcional que es llamado al comienzo de cada hilo de trabajo; initargs es una tupla de argumentos pasados al inicializador. Si el initializer lanza una excepción, todos los trabajos actualmente pendientes lanzarán
BrokenThreadPool
, así como cualquier intento de enviar más trabajos al grupo.Distinto en la versión 3.5: Si max_workers es
None
o no es especificado, se tomará por defecto el número de procesadores de la máquina, multiplicado por5
, asumiendo queThreadPoolExecutor
a menudo se utiliza para paralelizar E/S en lugar de trabajo de CPU y que el numero de trabajadores debe ser mayor que el número de trabajadores paraProcessPoolExecutor
.Nuevo en la versión 3.6: El argumento thread_name_prefix fue añadido para permitir al usuario controlar los nombres asignados a los
threading.Thread
creados por el grupo para facilitar la depuración del programa.Distinto en la versión 3.7: Se agregaron los argumentos initializer y initargs.
Distinto en la versión 3.8: El valor predeterminado de max_workers fue reemplazado por
min(32, os.cpu_count() + 4)
. Este valor predeterminado conserva al menos 5 trabajadores para las tareas vinculadas de E/S. Utiliza como máximo 32 núcleos de CPU para tareas vinculadas a la CPU que liberan el GIL. Y evita utilizar recursos muy grandes implícitamente en máquinas de muchos núcleos.ThreadPoolExecutor ahora también reutiliza hilos inactivos antes de crear max_workers hilos de trabajo.
Ejemplo de ThreadPoolExecutor¶
import concurrent.futures
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://nonexistant-subdomain.python.org/']
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
ProcessPoolExecutor¶
The ProcessPoolExecutor
class is an Executor
subclass that
uses a pool of processes to execute calls asynchronously.
ProcessPoolExecutor
uses the multiprocessing
module, which
allows it to side-step the Global Interpreter Lock but also means that
only picklable objects can be executed and returned.
El módulo __main__
debe ser importable por los subprocesos trabajadores. Esto significa que ProcessPoolExecutor
no funcionará en el intérprete interactivo.
Llamar a métodos de Executor
o Future
desde el invocable enviado a ProcessPoolExecutor
resultará en bloqueos mutuos.
-
class
concurrent.futures.
ProcessPoolExecutor
(max_workers=None, mp_context=None, initializer=None, initargs=())¶ An
Executor
subclass that executes calls asynchronously using a pool of at most max_workers processes. If max_workers isNone
or not given, it will default to the number of processors on the machine. If max_workers is less than or equal to0
, then aValueError
will be raised. On Windows, max_workers must be less than or equal to61
. If it is not thenValueError
will be raised. If max_workers isNone
, then the default chosen will be at most61
, even if more processors are available. mp_context can be a multiprocessing context or None. It will be used to launch the workers. If mp_context isNone
or not given, the default multiprocessing context is used.initializer is an optional callable that is called at the start of each worker process; initargs is a tuple of arguments passed to the initializer. Should initializer raise an exception, all currently pending jobs will raise a
BrokenProcessPool
, as well as any attempt to submit more jobs to the pool.Distinto en la versión 3.3: Cuando uno de los procesos finaliza abruptamente, se lanzará
BrokenProcessPool
. Anteriormente, el comportamiento no estaba definido, pero las operaciones en el ejecutor o sus futuros a menudo se detenían o bloqueaban mutuamente.Distinto en la versión 3.7: El argumento mp_context se agregó para permitir a los usuarios controlar el método de iniciación para procesos de trabajo creados en el grupo.
Se agregaron los argumentos initializer y initargs.
Ejemplo de ProcessPoolExecutor¶
import concurrent.futures
import math
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]
def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
if __name__ == '__main__':
main()
Objetos Futuro¶
La clase Future
encapsula la ejecución asincrónica del invocable. Las instancias de Future
son creadas por Executor.submit()
.
-
class
concurrent.futures.
Future
¶ Encapsula la ejecución asincrónica del invocable. Las instancias de
Future
son creadas porExecutor.submit()
y no deberían ser creadas directamente, excepto para pruebas.-
cancel
()¶ Intenta cancelar la llamada. Si el invocable está siendo ejecutado o ha finalizado su ejecución y no puede ser cancelado el método retornará
False
, de lo contrario la llamada será cancelada y el método retornaráTrue
.
-
cancelled
()¶ Retorna
True
si la llamada fue cancelada exitosamente.
-
running
()¶ Retorna
True
si la llamada está siendo ejecutada y no puede ser cancelada.
-
done
()¶ Retorna
True
si la llamada fue cancelada exitosamente o terminó su ejecución.
-
result
(timeout=None)¶ Retorna el valor retornado por la llamada. Si la llamada aún no ha finalizado, el método esperará un total de timeout segundos. Si la llamada no ha finalizado luego de timeout segundos,
concurrent.futures.TimeoutError
será lanzada. timeout puede ser un int o un float. Si timeout esNone
o no fue especificado, no hay limite de espera.Si el futuro es cancelado antes de finalizar su ejecución,
CancelledError
será lanzada.Si la llamada lanzó una excepción, este método lanzará la misma excepción.
-
exception
(timeout=None)¶ Retorna la excepción lanzada por la llamada. Si la llamada aún no ha finalizado, el método esperará un máximo de timeout segundos. Si la llamada aún no ha finalizado luego de timeout segundos, entonces
concurrent.futures.TimeoutError
será lanzada. timeout puede ser un int o un float. Si timeout esNone
o no es especificado, no hay limite en el tiempo de espera.Si el futuro es cancelado antes de finalizar su ejecución,
CancelledError
será lanzada.Si la llamada es completada sin excepciones, se retornará
`None
.
-
add_done_callback
(fn)¶ Asocia el invocable fn al futuro. fn va a ser llamada, con el futuro como su único argumento, cuando el futuro sea cancelado o finalice su ejecución.
Added callables are called in the order that they were added and are always called in a thread belonging to the process that added them. If the callable raises an
Exception
subclass, it will be logged and ignored. If the callable raises aBaseException
subclass, the behavior is undefined.Si el futuro ya ha finalizado su ejecución o fue cancelado, fn retornará inmediatamente.
Los siguientes métodos de
Future
están pensados para ser usados en pruebas unitarias e implementaciones deExecutor
.-
set_running_or_notify_cancel
()¶ Este método sólo debe ser llamado en implementaciones de
Executor
antes de ejecutar el trabajo asociado alFuture
y por las pruebas unitarias.Si el método retorna
False
entoncesFuture
fue cancelado. i.e.Future.cancel()
fue llamado y retornó True. Todos los hilos esperando la finalización delFuture
(i.e. a través deas_completed()
owait()
) serán despertados.Si el método retorna
True
, entonces elFuture
no fue cancelado y ha sido colocado en estado de ejecución, i.e. las llamadas aFuture.running()
retornarán True.Este método solo puede ser llamado una sola vez y no puede ser llamado luego de haber llamado a
Future.set_result()
o aFuture.set_exception()
.
-
set_result
(result)¶ Establece result como el resultado del trabajo asociado al
Future
.Este método solo debe ser usado por implementaciones de
Executor
y pruebas unitarias.Distinto en la versión 3.8: Este método lanza
concurrent.futures.InvalidStateError
siFuture
ya ha finalizado su ejecución.
-
set_exception
(exception)¶ Establece exception, subclase de
Exception
, como el resultado del trabajo asociado alFuture
.Este método solo debe ser usado por implementaciones de
Executor
y pruebas unitarias.Distinto en la versión 3.8: Este método lanza
concurrent.futures.InvalidStateError
siFuture
ya ha finalizado su ejecución.
-
Funciones del Módulo¶
-
concurrent.futures.
wait
(fs, timeout=None, return_when=ALL_COMPLETED)¶ Espera a la finalización de las instancias de
Future
(posiblemente creadas por distintas instancias deExecutor
) dadas por fs. Retorna una tupla nombrada de 2 conjuntos. El primer conjunto, llamadodone
, contiene los futuros que finalizaron su ejecución (producto de su finalización normal o su cancelación) antes del tiempo de espera especificado. El segundo conjunto, llamadonot_done
, contiene los futuros que no finalizaron su ejecución (pueden estar pendientes o ejecutándose en ese momento).El argumento timeout puede ser usado para controlar la espera máxima en segundos antes de retornar. timeout puede ser un int o un float. Si timeout no es especificado o es
None
, no hay limite en el tiempo de espera.return_when indica cuando debe retornar esta función. Debe ser alguna de las siguientes constantes:
Constante
Descripción
FIRST_COMPLETED
La función retornará cuando cualquier futuro finalice o sea cancelado.
FIRST_EXCEPTION
La función retornará cuando cualquier futuro finalice lanzando una excepción. Si ningún futuro lanza una excepción, esta opción es equivalente a
ALL_COMPLETED
.ALL_COMPLETED
La función retornará cuando todos los futuros finalicen o sean cancelados.
-
concurrent.futures.
as_completed
(fs, timeout=None)¶ Returns an iterator over the
Future
instances (possibly created by differentExecutor
instances) given by fs that yields futures as they complete (finished or cancelled futures). Any futures given by fs that are duplicated will be returned once. Any futures that completed beforeas_completed()
is called will be yielded first. The returned iterator raises aconcurrent.futures.TimeoutError
if__next__()
is called and the result isn’t available after timeout seconds from the original call toas_completed()
. timeout can be an int or float. If timeout is not specified orNone
, there is no limit to the wait time.
Ver también
- PEP 3148 – futuros - ejecutar cómputos asincrónicamente
La propuesta que describe esta propuesta de inclusión en la biblioteca estándar de Python.
Clases de Excepciones¶
-
exception
concurrent.futures.
CancelledError
¶ Lanzada cuando un futuro es cancelado.
-
exception
concurrent.futures.
TimeoutError
¶ Lanzada cuando un futuro excede el tiempo de espera máximo.
-
exception
concurrent.futures.
BrokenExecutor
¶ Derivada de
RuntimeError
, esta excepción es lanzada cuando un ejecutor se encuentra corrupto por algún motivo y no puede ser utilizado para enviar o ejecutar nuevas tareas.Nuevo en la versión 3.7.
-
exception
concurrent.futures.
InvalidStateError
¶ Lanzada cuando una operación es realizada sobre un futuro que no permite dicha operación en el estado actual.
Nuevo en la versión 3.8.
-
exception
concurrent.futures.thread.
BrokenThreadPool
¶ Derivada de
BrokenExecutor
, esta excepción es lanzada cuando uno de los trabajadores deThreadPoolExecutor
ha fallado en su inicialización.Nuevo en la versión 3.7.
-
exception
concurrent.futures.process.
BrokenProcessPool
¶ Derivada de
BrokenExecutor
(previamenteRuntimeError
), esta excepción es lanzada cuando uno de los trabajadores deProcessPoolExecutor
ha finalizado de forma abrupta (por ejemplo, al ser terminado desde afuera del proceso).Nuevo en la versión 3.3.