concurrent.futures — Lanzamiento de tareas paralelas

Nuevo en la versión 3.2.

Código fuente: Lib/concurrent/futures/thread.py y Lib/concurrent/futures/process.py


El módulo concurrent.futures provee una interfaz de alto nivel para ejecutar invocables de forma asincrónica.

La ejecución asincrónica se puede realizar mediante hilos, usando ThreadPoolExecutor, o procesos independientes, mediante ProcessPoolExecutor. Ambos implementan la misma interfaz, que se encuentra definida por la clase abstracta Executor.

Disponibilidad: no Emscripten, no WASI.

Este módulo no funciona o no está disponible en las plataformas WebAssembly wasm32-emscripten y wasm32-wasi. Consulte Plataformas WebAssembly para obtener más información.

Objetos ejecutores

class concurrent.futures.Executor

Una clase abstracta que proporciona métodos para ejecutar llamadas de forma asincrónica. No debe ser utilizada directamente, sino a través de sus subclases.

submit(fn, /, *args, **kwargs)

Programa la invocación de fn, que será ejecutada como fn(*args, **kwargs) y retorna un objeto Future que representa la ejecución del invocable.

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result())
map(func, *iterables, timeout=None, chunksize=1)

Similar a map(func, *iterables) excepto que:

  • los iterables son recolectados inmediatamente, en lugar de perezosamente;

  • func se ejecuta de forma asincrónica y se pueden realizar varias llamadas a func simultáneamente.

El iterador retornado lanza TimeoutError si __next__() es llamado y el resultado no está disponible luego de timeout segundos luego de la llamada original a Executor.map(). timeout puede ser un int o un float. Si no se provee un timeout o es None, no hay limite de espera.

Si una llamada a func lanza una excepción, dicha excepción va a ser lanzada cuando su valor sea retornado por el iterador.

Al usar ProcessPoolExecutor, este método divide los iterables en varios fragmentos que luego envía al grupo como tareas separadas. El tamaño (aproximado) de estos fragmentos puede especificarse estableciendo chunksize a un entero positivo. El uso de un valor grande para chunksize puede mejorar significativamente el rendimiento en comparación con el tamaño predeterminado de 1. Con ThreadPoolExecutor, el chunksize no tiene ningún efecto.

Distinto en la versión 3.5: Se agregó el argumento chunksize.

shutdown(wait=True, *, cancel_futures=False)

Indica al ejecutor que debe liberar todos los recursos que está utilizando cuando los futuros actualmente pendientes de ejecución finalicen. Las llamadas a Executor.submit() y Executor.map() realizadas después del apagado lanzarán RuntimeError.

Si wait es True este método no retornará hasta que todos los futuros pendientes hayan terminado su ejecución y los recursos asociados al ejecutor hayan sido liberados. Si wait es False, este método retornará de inmediato y los recursos asociados al ejecutor se liberarán cuando todos los futuros asociados hayan finalizado su ejecución. Independientemente del valor de wait, el programa Python entero no finalizará hasta que todos los futuros pendientes hayan finalizado su ejecución.

Si cancel_futures es True, este método cancelará todos los futuros pendientes que el ejecutor no ha comenzado a ejecutar. Los futuros que se completen o estén en ejecución no se cancelarán, independientemente del valor de cancel_futures.

Si tanto cancel_futures como wait son True, todos los futuros que el ejecutor ha comenzado a ejecutar se completarán antes de que regrese este método. Los futuros restantes se cancelan.

Se puede evitar tener que llamar este método explícitamente si se usa la sentencia with, la cual apagará el Executor (esperando como si Executor.shutdown() hubiera sido llamado con wait en True):

import shutil
with ThreadPoolExecutor(max_workers=4) as e:
    e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
    e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
    e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
    e.submit(shutil.copy, 'src4.txt', 'dest4.txt')

Distinto en la versión 3.9: Agregado cancel_futures.

ThreadPoolExecutor

ThreadPoolExecutor es una subclase de Executor que usa un grupo de hilos para ejecutar llamadas de forma asincrónica.

Pueden ocurrir bloqueos mutuos cuando la llamada asociada a un Future espera el resultado de otro Future. Por ejemplo:

import time
def wait_on_b():
    time.sleep(5)
    print(b.result())  # b will never complete because it is waiting on a.
    return 5

def wait_on_a():
    time.sleep(5)
    print(a.result())  # a will never complete because it is waiting on b.
    return 6


executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)

Y:

def wait_on_future():
    f = executor.submit(pow, 5, 2)
    # This will never complete because there is only one worker thread and
    # it is executing this function.
    print(f.result())

executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())

Subclase de Executor que utiliza un grupo de hilos de max_workers como máximo para ejecutar llamadas de forma asincrónica.

Todos los hilos que se hayan puesto en cola en ThreadPoolExecutor se unirán antes de que el intérprete pueda salir. Tenga en cuenta que el controlador de salida que hace esto se ejecuta antes de cualquier controlador de salida añadido mediante atexit. Esto significa que las excepciones en el hilo principal deben ser capturadas y manejadas para indicar a los hilos que salgan correctamente. Por esta razón, se recomienda no utilizar ThreadPoolExecutor para tareas de larga duración.

initializer es un invocable opcional que es llamado al comienzo de cada hilo de trabajo; initargs es una tupla de argumentos pasados al inicializador. Si el initializer lanza una excepción, todos los trabajos actualmente pendientes lanzarán BrokenThreadPool, así como cualquier intento de enviar más trabajos al grupo.

Distinto en la versión 3.5: Si max_workers es None o no es especificado, se tomará por defecto el número de procesadores de la máquina, multiplicado por 5, asumiendo que ThreadPoolExecutor a menudo se utiliza para paralelizar E/S en lugar de trabajo de CPU y que el numero de trabajadores debe ser mayor que el número de trabajadores para ProcessPoolExecutor.

Nuevo en la versión 3.6: El argumento thread_name_prefix fue añadido para permitir al usuario controlar los nombres asignados a los threading.Thread creados por el grupo para facilitar la depuración del programa.

Distinto en la versión 3.7: Se agregaron los argumentos initializer y initargs.

Distinto en la versión 3.8: El valor predeterminado de max_workers fue reemplazado por min(32, os.cpu_count() + 4). Este valor predeterminado conserva al menos 5 trabajadores para las tareas vinculadas de E/S. Utiliza como máximo 32 núcleos de CPU para tareas vinculadas a la CPU que liberan el GIL. Y evita utilizar recursos muy grandes implícitamente en máquinas de muchos núcleos.

ThreadPoolExecutor ahora también reutiliza hilos inactivos antes de crear max_workers hilos de trabajo.

Distinto en la versión 3.13: Default value of max_workers is changed to min(32, (os.process_cpu_count() or 1) + 4).

Ejemplo de ThreadPoolExecutor

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://nonexistant-subdomain.python.org/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

ProcessPoolExecutor

La clase ProcessPoolExecutor es una subclase Executor que usa un grupo de procesos para ejecutar llamadas de forma asíncrona. ProcessPoolExecutor usa el módulo multiprocessing, que le permite eludir el Global Interpreter Lock pero también significa que solo los objetos seleccionables pueden ser ejecutados y retornados.

El módulo __main__ debe ser importable por los subprocesos trabajadores. Esto significa que ProcessPoolExecutor no funcionará en el intérprete interactivo.

Llamar a métodos de Executor o Future desde el invocable enviado a ProcessPoolExecutor resultará en bloqueos mutuos.

class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=(), max_tasks_per_child=None)

An Executor subclass that executes calls asynchronously using a pool of at most max_workers processes. If max_workers is None or not given, it will default to os.process_cpu_count(). If max_workers is less than or equal to 0, then a ValueError will be raised. On Windows, max_workers must be less than or equal to 61. If it is not then ValueError will be raised. If max_workers is None, then the default chosen will be at most 61, even if more processors are available. mp_context can be a multiprocessing context or None. It will be used to launch the workers. If mp_context is None or not given, the default multiprocessing context is used. See Contextos y métodos de inicio.

initializer es un invocable opcional que es llamado al comienzo de cada proceso trabajador; initargs es una tupla de argumentos pasados al inicializador. Si el initializer lanza una excepción, todos los trabajos actualmente pendientes lanzarán BrokenProcessPool, así como cualquier intento de enviar más trabajos al grupo.

max_tasks_per_child es un argumento opcional que especifica el número máximo de tareas que un único proceso puede ejecutar antes de salir y ser reemplazado por un nuevo proceso de trabajo. Por defecto, max_tasks_per_child es None, lo que significa que los procesos de trabajo vivirán tanto como el pool. Cuando se especifica un máximo, el método de inicio de multiprocesamiento «spawn» se utilizará por defecto en ausencia de un parámetro mp_context. Esta característica es incompatible con el método de inicio «fork».

Distinto en la versión 3.3: Cuando uno de los procesos finaliza abruptamente, se lanzará BrokenProcessPool. Anteriormente, el comportamiento no estaba definido, pero las operaciones en el ejecutor o sus futuros a menudo se detenían o bloqueaban mutuamente.

Distinto en la versión 3.7: El argumento mp_context se agregó para permitir a los usuarios controlar el método de iniciación para procesos de trabajo creados en el grupo.

Se agregaron los argumentos initializer y initargs.

Nota

El método de inicio por defecto de multiprocessing (ver Contextos y métodos de inicio) dejará de ser fork en Python 3.14. El código que requiera el uso de fork para su ProcessPoolExecutor debe especificarlo explícitamente pasando un parámetro mp_context=multiprocessing.get_context("fork").

Distinto en la versión 3.11: El argumento max_tasks_per_child se agregó para permitir a los usuarios controlar el tiempo de vida para procesos de trabajo creados en el grupo.

Distinto en la versión 3.12: En sistemas POSIX, si su aplicación tiene múltiples hilos y el contexto multiprocessing utiliza el método de inicio "fork": La función os.fork() llamada internamente para generar trabajadores puede lanzar un mensaje DeprecationWarning. Pase un mp_context configurado para utilizar un método de inicio diferente. Consulte la documentación de os.fork() para más información.

Distinto en la versión 3.13: max_workers uses os.process_cpu_count() by default, instead of os.cpu_count().

Ejemplo de ProcessPoolExecutor

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

Objetos futuro

La clase Future encapsula la ejecución asincrónica del invocable. Las instancias de Future son creadas por Executor.submit().

class concurrent.futures.Future

Encapsula la ejecución asincrónica del invocable. Las instancias de Future son creadas por Executor.submit() y no deberían ser creadas directamente, excepto para pruebas.

cancel()

Intenta cancelar la llamada. Si el invocable está siendo ejecutado o ha finalizado su ejecución y no puede ser cancelado el método retornará False, de lo contrario la llamada será cancelada y el método retornará True.

cancelled()

Retorna True si la llamada fue cancelada exitosamente.

running()

Retorna True si la llamada está siendo ejecutada y no puede ser cancelada.

done()

Retorna True si la llamada fue cancelada exitosamente o terminó su ejecución.

result(timeout=None)

Retorna el valor retornado por la llamada. Si la llamada aún no se ha completado, este método esperará hasta timeout segundos. Si la llamada no se ha completado en timeout segundos, entonces se lanzará un TimeoutError. timeout puede ser un int o un float. Si timeout no se especifica o es None, no hay límite para el tiempo de espera.

Si el futuro es cancelado antes de finalizar su ejecución, CancelledError será lanzada.

Si la llamada lanzó una excepción, este método lanzará la misma excepción.

exception(timeout=None)

Retorna la excepción lanzada por la llamada. Si la llamada aún no se ha completado, este método esperará hasta timeout segundos. Si la llamada no se ha completado en timeout segundos, entonces se lanzará un TimeoutError. timeout puede ser un int o un float. Si timeout no se especifica o es None, no hay límite para el tiempo de espera.

Si el futuro es cancelado antes de finalizar su ejecución, CancelledError será lanzada.

Si la llamada es completada sin excepciones, se retornará `None.

add_done_callback(fn)

Asocia el invocable fn al futuro. fn va a ser llamada, con el futuro como su único argumento, cuando el futuro sea cancelado o finalice su ejecución.

Los invocables agregados se llaman en el orden en que se agregaron y siempre se llaman en un hilo que pertenece al proceso que los agregó. Si el invocable genera una subclase Exception, se registrará y se ignorará. Si el invocable genera una subclase BaseException, el comportamiento no está definido.

Si el futuro ya ha finalizado su ejecución o fue cancelado, fn retornará inmediatamente.

Los siguientes métodos de Future están pensados para ser usados en pruebas unitarias e implementaciones de Executor.

set_running_or_notify_cancel()

Este método sólo debe ser llamado en implementaciones de Executor antes de ejecutar el trabajo asociado al Future y por las pruebas unitarias.

Si el método retorna False entonces el Future fue cancelado, es decir, Future.cancel() fue llamado y retornó True. Cualquier hilo que esté esperando a que el Future se complete (es decir, a través de as_completed() o wait()) será despertado.

Si el método retorna True entonces el Future no se ha cancelado y se ha puesto en estado de ejecución, es decir, las llamadas a Future.running`() retornarán True.

Este método solo puede ser llamado una sola vez y no puede ser llamado luego de haber llamado a Future.set_result() o a Future.set_exception().

set_result(result)

Establece result como el resultado del trabajo asociado al Future.

Este método solo debe ser usado por implementaciones de Executor y pruebas unitarias.

Distinto en la versión 3.8: Este método lanza concurrent.futures.InvalidStateError si Future ya ha finalizado su ejecución.

set_exception(exception)

Establece exception, subclase de Exception, como el resultado del trabajo asociado al Future.

Este método solo debe ser usado por implementaciones de Executor y pruebas unitarias.

Distinto en la versión 3.8: Este método lanza concurrent.futures.InvalidStateError si Future ya ha finalizado su ejecución.

Funciones del módulo

concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)

Espera a que se completen las instancias Future (posiblemente creadas por diferentes instancias Executor) dadas por fs. Los futuros duplicados dados a fs se eliminan y sólo se devolverán una vez. Retorna una tupla nombrada de 2 conjuntos. El primer conjunto, llamado done, contiene los futuros que se han completado (futuros finalizados o cancelados) antes de que se complete la espera. El segundo conjunto, llamado not_done, contiene los futuros que no se completaron (futuros pendientes o en ejecución).

El argumento timeout puede ser usado para controlar la espera máxima en segundos antes de retornar. timeout puede ser un int o un float. Si timeout no es especificado o es None, no hay limite en el tiempo de espera.

return_when indica cuando debe retornar esta función. Debe ser alguna de las siguientes constantes:

Constante

Descripción

FIRST_COMPLETED

La función retornará cuando cualquier futuro finalice o sea cancelado.

FIRST_EXCEPTION

La función retornará cuando cualquier futuro finalice lanzando una excepción. Si ningún futuro lanza una excepción, esta opción es equivalente a ALL_COMPLETED.

ALL_COMPLETED

La función retornará cuando todos los futuros finalicen o sean cancelados.

concurrent.futures.as_completed(fs, timeout=None)

Retorna un iterador sobre las instancias de Future (posiblemente creadas por distintas instancias de Executor) dadas por fs que produce futuros a medida que van finalizando (normalmente o cancelados). Cualquier futuro dado por fs que esté duplicado será retornado una sola vez. Los futuros que hayan finalizado antes de la llamada a as_completed() serán entregados primero. El iterador retornado lanzará TimeoutError si __next__() es llamado y el resultado no está disponible luego de timeout segundos a partir de la llamada original a as_completed(). timeout puede ser un int o un float. Si timeout no es especificado o es None, no hay límite en el tiempo de espera.

Ver también

PEP 3148 – futuros - ejecutar cómputos asincrónicamente

La propuesta que describe esta propuesta de inclusión en la biblioteca estándar de Python.

Clases de Excepciones

exception concurrent.futures.CancelledError

Lanzada cuando un futuro es cancelado.

exception concurrent.futures.TimeoutError

Un alias obsoleto de TimeoutError, lanzado cuando una operación en un futuro excede el tiempo de espera dado.

Distinto en la versión 3.11: Esta clase se convirtió en un alias de TimeoutError.

exception concurrent.futures.BrokenExecutor

Derivada de RuntimeError, esta excepción es lanzada cuando un ejecutor se encuentra corrupto por algún motivo y no puede ser utilizado para enviar o ejecutar nuevas tareas.

Nuevo en la versión 3.7.

exception concurrent.futures.InvalidStateError

Lanzada cuando una operación es realizada sobre un futuro que no permite dicha operación en el estado actual.

Nuevo en la versión 3.8.

exception concurrent.futures.thread.BrokenThreadPool

Derivada de BrokenExecutor, esta excepción es lanzada cuando uno de los trabajadores de ThreadPoolExecutor ha fallado en su inicialización.

Nuevo en la versión 3.7.

exception concurrent.futures.process.BrokenProcessPool

Derivada de BrokenExecutor (previamente RuntimeError), esta excepción es lanzada cuando uno de los trabajadores de ProcessPoolExecutor ha finalizado de forma abrupta (por ejemplo, al ser terminado desde afuera del proceso).

Nuevo en la versión 3.3.