Filas

Código-fonte: Lib/asyncio/queues.py


Filas asyncio são projetadas para serem similar a classes do módulo queue. Apesar de filas asyncio não serem seguras para thread, elas são projetadas para serem usadas especificamente em código async/await.

Perceba que métodos de filas asyncio não possuem um parâmetro timeout; use a função asyncio.wait_for() para realizar operações de fila com um tempo limite timeout.

Veja também a seção Exemplos abaixo.

Queue

class asyncio.Queue(maxsize=0)

Uma fila onde o primeiro a entrar, é o primeiro a sair (FIFO - First In First Out).

Se maxsize for menor que ou igual a zero, o tamanho da fila é infinito. Se ele for um inteiro maior que 0, então await put() bloqueia quando a fila atingir maxsize até que um item seja removido por get().

Ao contrário da biblioteca padrão de threading queue, o tamanho da fila é sempre conhecido e pode ser obtido através da chamada do método qsize().

Alterado na versão 3.10: Removido o parâmetro loop.

Esta classe não é segura para thread.

maxsize

Número de itens permitidos na fila.

empty()

Retorna True se a fila estiver vazia, False caso contrário.

full()

Retorna True se existem maxsize itens na fila.

Se a fila foi inicializada com maxsize=0 (o padrão), então full() nunca retorna True.

coroutine get()

Remove e retorna um item da fila. Se a fila estiver vazia, aguarda até que um item esteja disponível.

get_nowait()

Retorna um item se houver um imediatamente disponível, caso contrário levanta QueueEmpty.

coroutine join()

Bloqueia até que todos os itens na fila tenham sido recebidos e processados.

A contagem de tarefas inacabadas aumenta sempre que um item é adicionado à fila. A contagem diminui sempre que uma corrotina consumidora chama task_done() para indicar que o item foi recuperado e todo o trabalho nele foi concluído. Quando a contagem de tarefas inacabadas chega a zero, join() desbloqueia.

coroutine put(item)

Coloca um item na fila. Se a fila estiver cheia, aguarda até que uma posição livre esteja disponível antes de adicionar o item.

put_nowait(item)

Coloca um item na fila sem bloqueá-la.

Se nenhuma posição livre estiver imediatamente disponível, levanta QueueFull.

qsize()

Retorna o número de itens na fila.

task_done()

Indica que a tarefa anteriormente enfileirada está concluída.

Usada por consumidores de fila. Para cada get() usado para buscar uma tarefa, uma chamada subsequente para task_done() avisa à fila, que o processamento na tarefa está concluído.

Se um join() estiver sendo bloqueado no momento, ele irá continuar quando todos os itens tiverem sido processados (significando que uma chamada task_done() foi recebida para cada item que foi chamado o método put() para colocar na fila).

Levanta ValueError se chamada mais vezes do que a quantidade de itens existentes na fila.

Fila de prioridade

class asyncio.PriorityQueue

Uma variante de Queue; recupera entradas em ordem de prioridade (mais baixas primeiro).

Entradas são tipicamente tuplas no formato (priority_number, data).

Filas LIFO (último a entrar, primeiro a sair)

class asyncio.LifoQueue

Uma variante de Queue que recupera as entradas adicionadas mais recentemente primeiro (último a entrar, primeiro a sair).

Exceções

exception asyncio.QueueEmpty

Esta exceção é levantada quando o método get_nowait() é chamado em uma fila vazia.

exception asyncio.QueueFull

Exceção levantada quando o método put_nowait() é chamado em uma fila que atingiu seu maxsize.

Exemplos

Filas podem ser usadas para distribuir cargas de trabalho entre diversas tarefas concorrentes:

import asyncio
import random
import time


async def worker(name, queue):
    while True:
        # Get a "work item" out of the queue.
        sleep_for = await queue.get()

        # Sleep for the "sleep_for" seconds.
        await asyncio.sleep(sleep_for)

        # Notify the queue that the "work item" has been processed.
        queue.task_done()

        print(f'{name} has slept for {sleep_for:.2f} seconds')


async def main():
    # Create a queue that we will use to store our "workload".
    queue = asyncio.Queue()

    # Generate random timings and put them into the queue.
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)

    # Create three worker tasks to process the queue concurrently.
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)

    # Wait until the queue is fully processed.
    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at

    # Cancel our worker tasks.
    for task in tasks:
        task.cancel()
    # Wait until all worker tasks are cancelled.
    await asyncio.gather(*tasks, return_exceptions=True)

    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')


asyncio.run(main())