Colas¶
Código fuente: Lib/asyncio/queue.py
Las colas asyncio son diseñadas para ser similares a clases del módulo queue
. Sin embargo las colas asyncio no son seguras para hilos, son diseñadas para usar específicamente en código async/await.
Nota que los métodos de colas de asyncio no tienen un parámetro timeout; usa la función asyncio.wait_for()
para hacer operaciones de cola con un tiempo de espera.
Ver también la sección Examples a continuación.
Cola¶
-
class
asyncio.
Queue
(maxsize=0)¶ Una cola primero en entrar, primero en salir (PEPS, o FIFO en inglés).
Si maxsize es menor que o igual a cero, el tamaño de la cola es infinito. Si es un entero mayor a
0
, entoncesawait put()
se bloquea cuando una cola alcanza su maxsize hasta que un elemento es removido porget()
.Diferente de los subprocesos de la biblioteca estándar
queue
, el tamaño de la cola siempre es conocido y puede ser retornado llamando el métodoqsize()
.Distinto en la versión 3.10: Removed the loop parameter.
Esta clase es no segura para hilos.
-
maxsize
¶ Número de ítems permitidos en la cola.
-
empty
()¶ Retorna
True
si la cola es vacía, oFalse
en caso contrario.
-
full
()¶ Retorna
True
si haymaxsize
ítems en la cola.Si la cola fue inicializada con
maxsize=0
(el predeterminado), entoncesfill()
nunca retornaTrue
.
-
coroutine
get
()¶ Remueve y retorna un ítem de la cola. Si la cola es vacía, espera hasta que un ítem esté disponible.
-
get_nowait
()¶ Retorna un ítem si uno está inmediatamente disponible, de otra manera levanta
QueueEmpty
.
-
coroutine
join
()¶ Se bloquea hasta que todos los ítems en la cola han sido recibidos y procesados.
El conteo de tareas no terminadas sube siempre que un ítem es agregado a la cola. El conteo baja siempre que la ejecución de una corrutina
task_done()
para indicar que el ítem fue recuperado y todo el trabajo en él está completo. Cuando el conteo de tareas inacabadas llega a cero,join()
se desbloquea.
-
coroutine
put
(item)¶ Pone un ítem en la cola. Si la cola está completa, espera hasta que una entrada vacía esté disponible antes de agregar el ítem.
-
put_nowait
(item)¶ Pone un ítem en la cola sin bloquearse.
Si no hay inmediatamente disponibles entradas vacías, lanza
QueueFull
.
-
qsize
()¶ Retorna el número de ítems en la cola.
-
task_done
()¶ Indica que una tarea formalmente en cola está completa.
Usada por consumidores de la cola. Para cada
get()
usado para buscar una tarea, una ejecución subsecuente atask_done()
dice a la cola que el procesamiento de la tarea está completo.Si un
join()
está actualmente bloqueando, éste se resumirá cuando todos los ítems han sido procesados (implicado que un métodotask_done()
fue recibido por cada ítem que ha sidoput()
en la cola.Lanza
ValueError
si fue llamado más veces que los ítems en la cola.
-
Cola de prioridad¶
Cola UEPA (o LIFO en inglés)¶
Excepciones¶
-
exception
asyncio.
QueueEmpty
¶ Esta excepción es lanzada cuando el método
get_nowait()
es ejecutado en una cola vacía.
-
exception
asyncio.
QueueFull
¶ Las excepciones son lanzadas cuando el método
put_nowait()
es lanzado en una cola que ha alcanzado su maxsize.
Ejemplos¶
Las colas pueden ser usadas para distribuir cargas de trabajo entre múltiples tareas concurrentes:
import asyncio
import random
import time
async def worker(name, queue):
while True:
# Get a "work item" out of the queue.
sleep_for = await queue.get()
# Sleep for the "sleep_for" seconds.
await asyncio.sleep(sleep_for)
# Notify the queue that the "work item" has been processed.
queue.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
async def main():
# Create a queue that we will use to store our "workload".
queue = asyncio.Queue()
# Generate random timings and put them into the queue.
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
# Create three worker tasks to process the queue concurrently.
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
# Wait until the queue is fully processed.
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
# Cancel our worker tasks.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())