Filas¶
Código-fonte: Lib/asyncio/queues.py
Filas asyncio são projetadas para serem similar a classes do módulo queue
. Apesar de filas asyncio não serem seguras para thread, elas são projetadas para serem usadas especificamente em código async/await.
Perceba que métodos de filas asyncio não possuem um parâmetro timeout; use a função asyncio.wait_for()
para realizar operações de fila com um tempo limite timeout.
Veja também a seção Exemplos abaixo.
Queue¶
- class asyncio.Queue(maxsize=0)¶
Uma fila onde o primeiro a entrar, é o primeiro a sair (FIFO - First In First Out).
Se maxsize for menor que ou igual a zero, o tamanho da fila é infinito. Se ele for um inteiro maior que
0
, entãoawait put()
bloqueia quando a fila atingir maxsize até que um item seja removido porget()
.Ao contrário da biblioteca padrão de threading
queue
, o tamanho da fila é sempre conhecido e pode ser obtido através da chamada do métodoqsize()
.Alterado na versão 3.10: Removido o parâmetro loop.
Esta classe não é segura para thread.
- maxsize¶
Número de itens permitidos na fila.
- empty()¶
Retorna
True
se a fila estiver vazia,False
caso contrário.
- full()¶
Retorna
True
se existemmaxsize
itens na fila.Se a fila foi inicializada com
maxsize=0
(o padrão), entãofull()
nunca retornaTrue
.
- coroutine get()¶
Remove e retorna um item da fila. Se a fila estiver vazia, aguarda até que um item esteja disponível.
- get_nowait()¶
Retorna um item se houver um imediatamente disponível, caso contrário levanta
QueueEmpty
.
- coroutine join()¶
Bloqueia até que todos os itens na fila tenham sido recebidos e processados.
A contagem de tarefas inacabadas aumenta sempre que um item é adicionado à fila. A contagem diminui sempre que uma corrotina consumidora chama
task_done()
para indicar que o item foi recuperado e todo o trabalho nele foi concluído. Quando a contagem de tarefas inacabadas chega a zero,join()
desbloqueia.
- coroutine put(item)¶
Coloca um item na fila. Se a fila estiver cheia, aguarda até que uma posição livre esteja disponível antes de adicionar o item.
- put_nowait(item)¶
Coloca um item na fila sem bloqueá-la.
Se nenhuma posição livre estiver imediatamente disponível, levanta
QueueFull
.
- qsize()¶
Retorna o número de itens na fila.
- task_done()¶
Indica que a tarefa anteriormente enfileirada está concluída.
Usada por consumidores de fila. Para cada
get()
usado para buscar uma tarefa, uma chamada subsequente paratask_done()
avisa à fila, que o processamento na tarefa está concluído.Se um
join()
estiver sendo bloqueado no momento, ele irá continuar quando todos os itens tiverem sido processados (significando que uma chamadatask_done()
foi recebida para cada item que foi chamado o métodoput()
para colocar na fila).Levanta
ValueError
se chamada mais vezes do que a quantidade de itens existentes na fila.
Fila de prioridade¶
Filas LIFO (último a entrar, primeiro a sair)¶
Exceções¶
- exception asyncio.QueueEmpty¶
Esta exceção é levantada quando o método
get_nowait()
é chamado em uma fila vazia.
- exception asyncio.QueueFull¶
Exceção levantada quando o método
put_nowait()
é chamado em uma fila que atingiu seu maxsize.
Exemplos¶
Filas podem ser usadas para distribuir cargas de trabalho entre diversas tarefas concorrentes:
import asyncio
import random
import time
async def worker(name, queue):
while True:
# Get a "work item" out of the queue.
sleep_for = await queue.get()
# Sleep for the "sleep_for" seconds.
await asyncio.sleep(sleep_for)
# Notify the queue that the "work item" has been processed.
queue.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
async def main():
# Create a queue that we will use to store our "workload".
queue = asyncio.Queue()
# Generate random timings and put them into the queue.
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
# Create three worker tasks to process the queue concurrently.
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
# Wait until the queue is fully processed.
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
# Cancel our worker tasks.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())