Colas

Código fuente: Lib/asyncio/queue.py


Las colas asyncio son diseñadas para ser similares a clases del módulo queue. Sin embargo las colas asyncio no son seguras para hilos, son diseñadas para usar específicamente en código async/await.

Nota que los métodos de colas de asyncio no tienen un parámetro timeout; usa la función asyncio.wait_for() para hacer operaciones de cola con un tiempo de espera.

Ver también la sección Examples a continuación.

Cola

class asyncio.Queue(maxsize=0, *, loop=None)

Una cola primero en entrar, primero en salir (PEPS, o FIFO en inglés).

Si maxsize es menor que o igual a cero, el tamaño de la cola es infinito. Si es un entero mayor a 0, entonces await put() se bloquea cuando una cola alcanza su maxsize hasta que un elemento es removido por get().

Diferente de los subprocesos de la biblioteca estándar queue, el tamaño de la cola siempre es conocido y puede ser retornado llamando el método qsize().

Deprecated since version 3.8, will be removed in version 3.10: El parámetro loop.

Esta clase es no segura para hilos.

maxsize

Número de ítems permitidos en la cola.

empty()

Retorna True si la cola es vacía, o False en caso contrario.

full()

Retorna True si hay maxsize ítems en la cola.

Si la cola fue inicializada con maxsize=0 (el predeterminado), entonces fill() nunca retorna True.

coroutine get()

Remueve y retorna un ítem de la cola. Si la cola es vacía, espera hasta que un ítem esté disponible.

get_nowait()

Retorna un ítem si uno está inmediatamente disponible, de otra manera levanta QueueEmpty.

coroutine join()

Se bloquea hasta que todos los ítems en la cola han sido recibidos y procesados.

El conteo de tareas no terminadas sube siempre que un ítem es agregado a la cola. El conteo baja siempre que la ejecución de una corrutina task_done() para indicar que el ítem fue recuperado y todo el trabajo en él está completo. Cuando el conteo de tareas inacabadas llega a cero, join() se desbloquea.

coroutine put(item)

Pone un ítem en la cola. Si la cola está completa, espera hasta que una entrada vacía esté disponible antes de agregar el ítem.

put_nowait(item)

Pone un ítem en la cola sin bloquearse.

Si no hay inmediatamente disponibles entradas vacías, lanza QueueFull.

qsize()

Retorna el número de ítems en la cola.

task_done()

Indica que una tarea formalmente en cola está completa.

Usada por consumidores de la cola. Para cada get() usado para buscar una tarea, una ejecución subsecuente a task_done() dice a la cola que el procesamiento de la tarea está completo.

Si un join() está actualmente bloqueando, éste se resumirá cuando todos los ítems han sido procesados (implicado que un método task_done() fue recibido por cada ítem que ha sido put() en la cola.

Lanza ValueError si fue llamado más veces que los ítems en la cola.

Cola de prioridad

class asyncio.PriorityQueue

Una variante de Queue; recupera entradas en su orden de prioridad (el más bajo primero).

Las entradas son típicamente tuplas de la forma (priority_number, data).

Cola UEPA (o LIFO en inglés)

class asyncio.LifoQueue

Una variante de una Queue que recupera primero el elemento agregado más reciente (último en entrar, primero en salir).

Excepciones

exception asyncio.QueueEmpty

Esta excepción es lanzada cuando el método get_nowait() es ejecutado en una cola vacía.

exception asyncio.QueueFull

Las excepciones son lanzadas cuando el método put_nowait() es lanzado en una cola que ha alcanzado su maxsize.

Ejemplos

Las colas pueden ser usadas para distribuir cargas de trabajo entre múltiples tareas concurrentes:

import asyncio
import random
import time


async def worker(name, queue):
    while True:
        # Get a "work item" out of the queue.
        sleep_for = await queue.get()

        # Sleep for the "sleep_for" seconds.
        await asyncio.sleep(sleep_for)

        # Notify the queue that the "work item" has been processed.
        queue.task_done()

        print(f'{name} has slept for {sleep_for:.2f} seconds')


async def main():
    # Create a queue that we will use to store our "workload".
    queue = asyncio.Queue()

    # Generate random timings and put them into the queue.
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)

    # Create three worker tasks to process the queue concurrently.
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)

    # Wait until the queue is fully processed.
    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at

    # Cancel our worker tasks.
    for task in tasks:
        task.cancel()
    # Wait until all worker tasks are cancelled.
    await asyncio.gather(*tasks, return_exceptions=True)

    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')


asyncio.run(main())