Colas¶
Código fuente: Lib/asyncio/queue.py
Las colas asyncio son diseñadas para ser similares a clases del módulo queue
. Sin embargo las colas asyncio no son seguras para hilos, son diseñadas para usar específicamente en código async/await.
Nota que los métodos de colas de asyncio no tienen un parámetro timeout; usa la función asyncio.wait_for()
para hacer operaciones de cola con un tiempo de espera.
Ver también la sección Examples a continuación.
Cola¶
- class asyncio.Queue(maxsize=0)¶
Una cola primero en entrar, primero en salir (PEPS, o FIFO en inglés).
Si maxsize es menor que o igual a cero, el tamaño de la cola es infinito. Si es un entero mayor a
0
, entoncesawait put()
se bloquea cuando una cola alcanza su maxsize hasta que un elemento es removido porget()
.Diferente de los subprocesos de la biblioteca estándar
queue
, el tamaño de la cola siempre es conocido y puede ser retornado llamando el métodoqsize()
.Distinto en la versión 3.10: Excluido el parámetro loop.
Esta clase es no segura para hilos.
- maxsize¶
Número de ítems permitidos en la cola.
- empty()¶
Retorna
True
si la cola es vacía, oFalse
en caso contrario.
- full()¶
Retorna
True
si haymaxsize
ítems en la cola.If the queue was initialized with
maxsize=0
(the default), thenfull()
never returnsTrue
.
- coroutine get()¶
Remueve y retorna un ítem de la cola. Si la cola es vacía, espera hasta que un ítem esté disponible.
Raises
QueueShutDown
if the queue has been shut down and is empty, or if the queue has been shut down immediately.
- get_nowait()¶
Retorna un ítem si uno está inmediatamente disponible, de otra manera levanta
QueueEmpty
.
- coroutine join()¶
Se bloquea hasta que todos los ítems en la cola han sido recibidos y procesados.
El conteo de tareas no terminadas sube siempre que un ítem es agregado a la cola. El conteo baja siempre que la ejecución de una corrutina
task_done()
para indicar que el ítem fue recuperado y todo el trabajo en él está completo. Cuando el conteo de tareas inacabadas llega a cero,join()
se desbloquea.
- coroutine put(item)¶
Pone un ítem en la cola. Si la cola está completa, espera hasta que una entrada vacía esté disponible antes de agregar el ítem.
Raises
QueueShutDown
if the queue has been shut down.
- put_nowait(item)¶
Pone un ítem en la cola sin bloquearse.
Si no hay inmediatamente disponibles entradas vacías, lanza
QueueFull
.
- qsize()¶
Retorna el número de ítems en la cola.
- shutdown(immediate=False)¶
Shut down the queue, making
get()
andput()
raiseQueueShutDown
.By default,
get()
on a shut down queue will only raise once the queue is empty. Set immediate to true to makeget()
raise immediately instead.All blocked callers of
put()
andget()
will be unblocked. If immediate is true, a task will be marked as done for each remaining item in the queue, which may unblock callers ofjoin()
.Added in version 3.13.
- task_done()¶
Indicate that a formerly enqueued work item is complete.
Used by queue consumers. For each
get()
used to fetch a work item, a subsequent call totask_done()
tells the queue that the processing on the work item is complete.Si un
join()
está actualmente bloqueando, éste se resumirá cuando todos los ítems han sido procesados (implicado que un métodotask_done()
fue recibido por cada ítem que ha sidoput()
en la cola.shutdown(immediate=True)
callstask_done()
for each remaining item in the queue.Lanza
ValueError
si fue llamado más veces que los ítems en la cola.
Cola de prioridad¶
Cola UEPA (o LIFO en inglés)¶
Excepciones¶
- exception asyncio.QueueEmpty¶
Esta excepción es lanzada cuando el método
get_nowait()
es ejecutado en una cola vacía.
- exception asyncio.QueueFull¶
Las excepciones son lanzadas cuando el método
put_nowait()
es lanzado en una cola que ha alcanzado su maxsize.
Ejemplos¶
Las colas pueden ser usadas para distribuir cargas de trabajo entre múltiples tareas concurrentes:
import asyncio
import random
import time
async def worker(name, queue):
while True:
# Get a "work item" out of the queue.
sleep_for = await queue.get()
# Sleep for the "sleep_for" seconds.
await asyncio.sleep(sleep_for)
# Notify the queue that the "work item" has been processed.
queue.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
async def main():
# Create a queue that we will use to store our "workload".
queue = asyncio.Queue()
# Generate random timings and put them into the queue.
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
# Create three worker tasks to process the queue concurrently.
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
# Wait until the queue is fully processed.
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
# Cancel our worker tasks.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())