Files d'attente (queues)

Code source : Lib/asyncore.py


Les files d'attente asyncio sont conçues pour être similaires aux classes du module queue. Bien que les files d'attente asyncio ne soient pas compatibles avec les programmes à multiples fils d'exécution, elles sont conçues pour être utilisées spécifiquement dans le code async/await.

Notez que les méthodes des files d'attente asyncio n'ont pas de paramètre timeout ; utilisez la fonction asyncio.wait_for() pour effectuer des opérations de file d'attente avec un délai d'attente.

Voir également la section Exemples ci-dessous.

File d'attente (queue)

class asyncio.Queue(maxsize=0)

File d'attente premier entré, premier sorti (FIFO pour fist in, first out).

Si maxsize est inférieur ou égal à zéro, la taille de la file d'attente est infinie. Si c'est un entier supérieur à 0, alors await put() se bloque lorsque la file d'attente atteint maxsize jusqu'à ce qu'un élément soit supprimé par get().

Contrairement à la bibliothèque standard multi-fils queue, la taille de la file d'attente est toujours connue et peut être renvoyée en appelant la méthode qsize().

Modifié dans la version 3.10: suppression du paramètre loop.

Cette classe n'est pas compatible avec les fils d'exécution multiples.

maxsize

Nombre d'éléments autorisés dans la file d'attente.

empty()

Renvoie True si la file d'attente est vide, False sinon.

full()

Renvoie True s'il y a maxsize éléments dans la file d'attente.

Si la file d'attente a été initialisée avec maxsize=0 (la valeur par défaut), alors full() ne renvoie jamais True.

coroutine get()

Supprime et renvoie un élément de la file d'attente. Si la file d'attente est vide, attend qu'un élément soit disponible.

get_nowait()

Renvoie un élément s'il y en a un immédiatement disponible, sinon lève QueueEmpty.

coroutine join()

Bloque jusqu'à ce que tous les éléments de la file d'attente aient été récupérés et traités.

Le nombre de tâches inachevées augmente chaque fois qu'un élément est ajouté à la file. Ce nombre diminue chaque fois qu'un fil d'exécution consommateur appelle task_done() pour indiquer que l'élément a été extrait et que tout le travail à effectuer dessus est terminé. Lorsque le nombre de tâches non terminées devient nul, join() débloque.

coroutine put(item)

Met un élément dans la file d'attente. Si la file d'attente est pleine, attend qu'un emplacement libre soit disponible avant d'ajouter l'élément.

put_nowait(item)

Ajoute un élément dans la file d'attente sans bloquer.

Si aucun emplacement libre n'est immédiatement disponible, lève QueueFull.

qsize()

Renvoie le nombre d'éléments dans la file d'attente.

task_done()

Indique qu'une tâche précédemment mise en file d'attente est terminée.

Utilisé par les consommateurs de file d'attente. Pour chaque get() utilisé pour récupérer une tâche, un appel ultérieur à task_done() indique à la file d'attente que le traitement de la tâche est terminé.

Si un join() est actuellement bloquant, on reprendra lorsque tous les éléments auront été traités (ce qui signifie qu'un appel à task_done() a été effectué pour chaque élément qui a été put() dans la file).

Lève une exception ValueError si elle est appelée plus de fois qu'il n'y avait d'éléments dans la file.

File avec priorité

class asyncio.PriorityQueue

Une variante de Queue ; récupère les entrées par ordre de priorité (la plus basse en premier).

Les entrées sont généralement des n-uplets de la forme (priority_number, data).

Pile (LIFO)

class asyncio.LifoQueue

Une variante de Queue qui récupère en premier les entrées les plus récemment ajoutées (dernier entré, premier sorti).

Exceptions

exception asyncio.QueueEmpty

Cette exception est levée lorsque la méthode get_nowait() est appelée sur une file d'attente vide.

exception asyncio.QueueFull

Exception levée lorsque la méthode put_nowait() est appelée sur une file d'attente qui a atteint sa maxsize.

Exemples

Les files d'attente peuvent être utilisées pour répartir la charge de travail entre plusieurs tâches simultanées :

import asyncio
import random
import time


async def worker(name, queue):
    while True:
        # Get a "work item" out of the queue.
        sleep_for = await queue.get()

        # Sleep for the "sleep_for" seconds.
        await asyncio.sleep(sleep_for)

        # Notify the queue that the "work item" has been processed.
        queue.task_done()

        print(f'{name} has slept for {sleep_for:.2f} seconds')


async def main():
    # Create a queue that we will use to store our "workload".
    queue = asyncio.Queue()

    # Generate random timings and put them into the queue.
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)

    # Create three worker tasks to process the queue concurrently.
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)

    # Wait until the queue is fully processed.
    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at

    # Cancel our worker tasks.
    for task in tasks:
        task.cancel()
    # Wait until all worker tasks are cancelled.
    await asyncio.gather(*tasks, return_exceptions=True)

    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')


asyncio.run(main())