Colas¶
Código fuente: Lib/asyncio/queue.py
Las colas asyncio son diseñadas para ser similares a clases del módulo queue
. Sin embargo las colas asyncio no son seguras para hilos, son diseñadas para usar específicamente en código async/await.
Nota que los métodos de colas de asyncio no tienen un parámetro timeout; usa la función asyncio.wait_for()
para hacer operaciones de cola con un tiempo de espera.
Ver también la sección Examples a continuación.
Cola¶
- class asyncio.Queue(maxsize=0)¶
Una cola primero en entrar, primero en salir (PEPS, o FIFO en inglés).
Si maxsize es menor que o igual a cero, el tamaño de la cola es infinito. Si es un entero mayor a
0
, entoncesawait put()
se bloquea cuando una cola alcanza su maxsize hasta que un elemento es removido porget()
.Diferente de los subprocesos de la biblioteca estándar
queue
, el tamaño de la cola siempre es conocido y puede ser retornado llamando el métodoqsize()
.Distinto en la versión 3.10: Excluido el parámetro loop.
Esta clase es no segura para hilos.
- maxsize¶
Número de ítems permitidos en la cola.
- empty()¶
Retorna
True
si la cola es vacía, oFalse
en caso contrario.
- full()¶
Retorna
True
si haymaxsize
ítems en la cola.Si la cola fue inicializada con
maxsize=0
(el predeterminado), entoncesfill()
nunca retornaTrue
.
- coroutine get()¶
Remueve y retorna un ítem de la cola. Si la cola es vacía, espera hasta que un ítem esté disponible.
- get_nowait()¶
Retorna un ítem si uno está inmediatamente disponible, de otra manera levanta
QueueEmpty
.
- coroutine join()¶
Se bloquea hasta que todos los ítems en la cola han sido recibidos y procesados.
El conteo de tareas no terminadas sube siempre que un ítem es agregado a la cola. El conteo baja siempre que la ejecución de una corrutina
task_done()
para indicar que el ítem fue recuperado y todo el trabajo en él está completo. Cuando el conteo de tareas inacabadas llega a cero,join()
se desbloquea.
- coroutine put(item)¶
Pone un ítem en la cola. Si la cola está completa, espera hasta que una entrada vacía esté disponible antes de agregar el ítem.
- put_nowait(item)¶
Pone un ítem en la cola sin bloquearse.
Si no hay inmediatamente disponibles entradas vacías, lanza
QueueFull
.
- qsize()¶
Retorna el número de ítems en la cola.
- task_done()¶
Indica que una tarea formalmente en cola está completa.
Usada por consumidores de la cola. Para cada
get()
usado para buscar una tarea, una ejecución subsecuente atask_done()
dice a la cola que el procesamiento de la tarea está completo.Si un
join()
está actualmente bloqueando, éste se resumirá cuando todos los ítems han sido procesados (implicado que un métodotask_done()
fue recibido por cada ítem que ha sidoput()
en la cola.Lanza
ValueError
si fue llamado más veces que los ítems en la cola.
Cola de prioridad¶
Cola UEPA (o LIFO en inglés)¶
Excepciones¶
- exception asyncio.QueueEmpty¶
Esta excepción es lanzada cuando el método
get_nowait()
es ejecutado en una cola vacía.
- exception asyncio.QueueFull¶
Las excepciones son lanzadas cuando el método
put_nowait()
es lanzado en una cola que ha alcanzado su maxsize.
Ejemplos¶
Las colas pueden ser usadas para distribuir cargas de trabajo entre múltiples tareas concurrentes:
import asyncio
import random
import time
async def worker(name, queue):
while True:
# Get a "work item" out of the queue.
sleep_for = await queue.get()
# Sleep for the "sleep_for" seconds.
await asyncio.sleep(sleep_for)
# Notify the queue that the "work item" has been processed.
queue.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
async def main():
# Create a queue that we will use to store our "workload".
queue = asyncio.Queue()
# Generate random timings and put them into the queue.
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
# Create three worker tasks to process the queue concurrently.
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
# Wait until the queue is fully processed.
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
# Cancel our worker tasks.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())