Filas

Código-fonte: Lib/asyncio/queues.py


Filas asyncio são projetadas para serem similar a classes do módulo queue. Apesar de filas asyncio não serem seguras para thread, elas são projetadas para serem usadas especificamente em código async/await.

Perceba que métodos de filas asyncio não possuem um parâmetro timeout; use a função asyncio.wait_for() para realizar operações de fila com um tempo limite timeout.

Veja também a seção Exemplos abaixo.

Queue

class asyncio.Queue(maxsize=0)

Uma fila onde o primeiro a entrar, é o primeiro a sair (FIFO - First In First Out).

Se maxsize for menor que ou igual a zero, o tamanho da fila é infinito. Se ele for um inteiro maior que 0, então await put() bloqueia quando a fila atingir maxsize até que um item seja removido por get().

Ao contrário da biblioteca padrão de threading queue, o tamanho da fila é sempre conhecido e pode ser obtido através da chamada do método qsize().

Alterado na versão 3.10: Removido o parâmetro loop.

Esta classe não é segura para thread.

maxsize

Número de itens permitidos na fila.

empty()

Retorna True se a fila estiver vazia, False caso contrário.

full()

Retorna True se existem maxsize itens na fila.

Se a fila foi inicializada com maxsize=0 (o padrão), então full() nunca retorna True.

coroutine get()

Remove e retorna um item da fila. Se a fila estiver vazia, aguarda até que um item esteja disponível.

Raises QueueShutDown if the queue has been shut down and is empty, or if the queue has been shut down immediately.

get_nowait()

Retorna um item se houver um imediatamente disponível, caso contrário levanta QueueEmpty.

coroutine join()

Bloqueia até que todos os itens na fila tenham sido recebidos e processados.

A contagem de tarefas inacabadas aumenta sempre que um item é adicionado à fila. A contagem diminui sempre que uma corrotina consumidora chama task_done() para indicar que o item foi recuperado e todo o trabalho nele foi concluído. Quando a contagem de tarefas inacabadas chega a zero, join() desbloqueia.

coroutine put(item)

Coloca um item na fila. Se a fila estiver cheia, aguarda até que uma posição livre esteja disponível antes de adicionar o item.

Raises QueueShutDown if the queue has been shut down.

put_nowait(item)

Coloca um item na fila sem bloqueá-la.

Se nenhuma posição livre estiver imediatamente disponível, levanta QueueFull.

qsize()

Retorna o número de itens na fila.

shutdown(immediate=False)

Shut down the queue, making get() and put() raise QueueShutDown.

By default, get() on a shut down queue will only raise once the queue is empty. Set immediate to true to make get() raise immediately instead.

All blocked callers of put() and get() will be unblocked. If immediate is true, a task will be marked as done for each remaining item in the queue, which may unblock callers of join().

Adicionado na versão 3.13.

task_done()

Indica que a tarefa anteriormente enfileirada está concluída.

Usada por consumidores de fila. Para cada get() usado para buscar uma tarefa, uma chamada subsequente para task_done() avisa à fila, que o processamento na tarefa está concluído.

Se um join() estiver sendo bloqueado no momento, ele irá continuar quando todos os itens tiverem sido processados (significando que uma chamada task_done() foi recebida para cada item que foi chamado o método put() para colocar na fila).

shutdown(immediate=True) calls task_done() for each remaining item in the queue.

Levanta ValueError se chamada mais vezes do que a quantidade de itens existentes na fila.

Fila de prioridade

class asyncio.PriorityQueue

Uma variante de Queue; recupera entradas em ordem de prioridade (mais baixas primeiro).

Entradas são tipicamente tuplas no formato (priority_number, data).

Filas LIFO (último a entrar, primeiro a sair)

class asyncio.LifoQueue

Uma variante de Queue que recupera as entradas adicionadas mais recentemente primeiro (último a entrar, primeiro a sair).

Exceções

exception asyncio.QueueEmpty

Esta exceção é levantada quando o método get_nowait() é chamado em uma fila vazia.

exception asyncio.QueueFull

Exceção levantada quando o método put_nowait() é chamado em uma fila que atingiu seu maxsize.

exception asyncio.QueueShutDown

Exception raised when put() or get() is called on a queue which has been shut down.

Adicionado na versão 3.13.

Exemplos

Filas podem ser usadas para distribuir cargas de trabalho entre diversas tarefas concorrentes:

import asyncio
import random
import time


async def worker(name, queue):
    while True:
        # Get a "work item" out of the queue.
        sleep_for = await queue.get()

        # Sleep for the "sleep_for" seconds.
        await asyncio.sleep(sleep_for)

        # Notify the queue that the "work item" has been processed.
        queue.task_done()

        print(f'{name} has slept for {sleep_for:.2f} seconds')


async def main():
    # Create a queue that we will use to store our "workload".
    queue = asyncio.Queue()

    # Generate random timings and put them into the queue.
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)

    # Create three worker tasks to process the queue concurrently.
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)

    # Wait until the queue is fully processed.
    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at

    # Cancel our worker tasks.
    for task in tasks:
        task.cancel()
    # Wait until all worker tasks are cancelled.
    await asyncio.gather(*tasks, return_exceptions=True)

    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')


asyncio.run(main())