Filas¶
Código-fonte: Lib/asyncio/queues.py
Filas asyncio são projetadas para serem similar a classes do módulo queue
. Apesar de filas asyncio não serem seguras para thread, elas são projetadas para serem usadas especificamente em código async/await.
Perceba que métodos de filas asyncio não possuem um parâmetro timeout; use a função asyncio.wait_for()
para realizar operações de fila com um tempo limite timeout.
Veja também a seção Exemplos abaixo.
Queue¶
- class asyncio.Queue(maxsize=0)¶
Uma fila onde o primeiro a entrar, é o primeiro a sair (FIFO - First In First Out).
Se maxsize for menor que ou igual a zero, o tamanho da fila é infinito. Se ele for um inteiro maior que
0
, entãoawait put()
bloqueia quando a fila atingir maxsize até que um item seja removido porget()
.Ao contrário da biblioteca padrão de threading
queue
, o tamanho da fila é sempre conhecido e pode ser obtido através da chamada do métodoqsize()
.Alterado na versão 3.10: Removido o parâmetro loop.
Esta classe não é segura para thread.
- maxsize¶
Número de itens permitidos na fila.
- empty()¶
Retorna
True
se a fila estiver vazia,False
caso contrário.
- full()¶
Retorna
True
se existemmaxsize
itens na fila.Se a fila foi inicializada com
maxsize=0
(o padrão), entãofull()
nunca retornaTrue
.
- coroutine get()¶
Remove e retorna um item da fila. Se a fila estiver vazia, aguarda até que um item esteja disponível.
Raises
QueueShutDown
if the queue has been shut down and is empty, or if the queue has been shut down immediately.
- get_nowait()¶
Retorna um item se houver um imediatamente disponível, caso contrário levanta
QueueEmpty
.
- coroutine join()¶
Bloqueia até que todos os itens na fila tenham sido recebidos e processados.
A contagem de tarefas inacabadas aumenta sempre que um item é adicionado à fila. A contagem diminui sempre que uma corrotina consumidora chama
task_done()
para indicar que o item foi recuperado e todo o trabalho nele foi concluído. Quando a contagem de tarefas inacabadas chega a zero,join()
desbloqueia.
- coroutine put(item)¶
Coloca um item na fila. Se a fila estiver cheia, aguarda até que uma posição livre esteja disponível antes de adicionar o item.
Raises
QueueShutDown
if the queue has been shut down.
- put_nowait(item)¶
Coloca um item na fila sem bloqueá-la.
Se nenhuma posição livre estiver imediatamente disponível, levanta
QueueFull
.
- qsize()¶
Retorna o número de itens na fila.
- shutdown(immediate=False)¶
Shut down the queue, making
get()
andput()
raiseQueueShutDown
.By default,
get()
on a shut down queue will only raise once the queue is empty. Set immediate to true to makeget()
raise immediately instead.All blocked callers of
put()
andget()
will be unblocked. If immediate is true, a task will be marked as done for each remaining item in the queue, which may unblock callers ofjoin()
.Adicionado na versão 3.13.
- task_done()¶
Indica que a tarefa anteriormente enfileirada está concluída.
Usada por consumidores de fila. Para cada
get()
usado para buscar uma tarefa, uma chamada subsequente paratask_done()
avisa à fila, que o processamento na tarefa está concluído.Se um
join()
estiver sendo bloqueado no momento, ele irá continuar quando todos os itens tiverem sido processados (significando que uma chamadatask_done()
foi recebida para cada item que foi chamado o métodoput()
para colocar na fila).shutdown(immediate=True)
callstask_done()
for each remaining item in the queue.Levanta
ValueError
se chamada mais vezes do que a quantidade de itens existentes na fila.
Fila de prioridade¶
Filas LIFO (último a entrar, primeiro a sair)¶
Exceções¶
- exception asyncio.QueueEmpty¶
Esta exceção é levantada quando o método
get_nowait()
é chamado em uma fila vazia.
- exception asyncio.QueueFull¶
Exceção levantada quando o método
put_nowait()
é chamado em uma fila que atingiu seu maxsize.
Exemplos¶
Filas podem ser usadas para distribuir cargas de trabalho entre diversas tarefas concorrentes:
import asyncio
import random
import time
async def worker(name, queue):
while True:
# Get a "work item" out of the queue.
sleep_for = await queue.get()
# Sleep for the "sleep_for" seconds.
await asyncio.sleep(sleep_for)
# Notify the queue that the "work item" has been processed.
queue.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
async def main():
# Create a queue that we will use to store our "workload".
queue = asyncio.Queue()
# Generate random timings and put them into the queue.
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
# Create three worker tasks to process the queue concurrently.
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
# Wait until the queue is fully processed.
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
# Cancel our worker tasks.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())