Files d'attente (queues)¶
Code source : Lib/asyncore.py
Les files d'attente asyncio sont conçues pour être similaires aux classes du module queue
. Bien que les files d'attente asyncio ne soient pas compatibles avec les programmes à multiples fils d'exécution, elles sont conçues pour être utilisées spécifiquement dans le code async/await.
Notez que les méthodes des files d'attente asyncio n'ont pas de paramètre timeout ; utilisez la fonction asyncio.wait_for()
pour effectuer des opérations de file d'attente avec un délai d'attente.
Voir également la section Exemples ci-dessous.
File d'attente (queue)¶
- class asyncio.Queue(maxsize=0)¶
File d'attente premier entré, premier sorti (FIFO pour fist in, first out).
Si maxsize est inférieur ou égal à zéro, la taille de la file d'attente est infinie. Si c'est un entier supérieur à
0
, alorsawait put()
se bloque lorsque la file d'attente atteint maxsize jusqu'à ce qu'un élément soit supprimé parget()
.Contrairement à la bibliothèque standard multi-fils
queue
, la taille de la file d'attente est toujours connue et peut être renvoyée en appelant la méthodeqsize()
.Modifié dans la version 3.10: suppression du paramètre loop.
Cette classe n'est pas compatible avec les fils d'exécution multiples.
- maxsize¶
Nombre d'éléments autorisés dans la file d'attente.
- empty()¶
Renvoie
True
si la file d'attente est vide,False
sinon.
- full()¶
Renvoie
True
s'il y amaxsize
éléments dans la file d'attente.Si la file d'attente a été initialisée avec
maxsize=0
(la valeur par défaut), alorsfull()
ne renvoie jamaisTrue
.
- coroutine get()¶
Supprime et renvoie un élément de la file d'attente. Si la file d'attente est vide, attend qu'un élément soit disponible.
- get_nowait()¶
Renvoie un élément s'il y en a un immédiatement disponible, sinon lève
QueueEmpty
.
- coroutine join()¶
Bloque jusqu'à ce que tous les éléments de la file d'attente aient été récupérés et traités.
Le nombre de tâches inachevées augmente chaque fois qu'un élément est ajouté à la file. Ce nombre diminue chaque fois qu'un fil d'exécution consommateur appelle
task_done()
pour indiquer que l'élément a été extrait et que tout le travail à effectuer dessus est terminé. Lorsque le nombre de tâches non terminées devient nul,join()
débloque.
- coroutine put(item)¶
Met un élément dans la file d'attente. Si la file d'attente est pleine, attend qu'un emplacement libre soit disponible avant d'ajouter l'élément.
- put_nowait(item)¶
Ajoute un élément dans la file d'attente sans bloquer.
Si aucun emplacement libre n'est immédiatement disponible, lève
QueueFull
.
- qsize()¶
Renvoie le nombre d'éléments dans la file d'attente.
- task_done()¶
Indique qu'une tâche précédemment mise en file d'attente est terminée.
Utilisé par les consommateurs de file d'attente. Pour chaque
get()
utilisé pour récupérer une tâche, un appel ultérieur àtask_done()
indique à la file d'attente que le traitement de la tâche est terminé.Si un
join()
est actuellement bloquant, on reprendra lorsque tous les éléments auront été traités (ce qui signifie qu'un appel àtask_done()
a été effectué pour chaque élément qui a étéput()
dans la file).Lève une exception
ValueError
si elle est appelée plus de fois qu'il n'y avait d'éléments dans la file.
File avec priorité¶
Pile (LIFO)¶
Exceptions¶
- exception asyncio.QueueEmpty¶
Cette exception est levée lorsque la méthode
get_nowait()
est appelée sur une file d'attente vide.
- exception asyncio.QueueFull¶
Exception levée lorsque la méthode
put_nowait()
est appelée sur une file d'attente qui a atteint sa maxsize.
Exemples¶
Les files d'attente peuvent être utilisées pour répartir la charge de travail entre plusieurs tâches simultanées :
import asyncio
import random
import time
async def worker(name, queue):
while True:
# Get a "work item" out of the queue.
sleep_for = await queue.get()
# Sleep for the "sleep_for" seconds.
await asyncio.sleep(sleep_for)
# Notify the queue that the "work item" has been processed.
queue.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
async def main():
# Create a queue that we will use to store our "workload".
queue = asyncio.Queue()
# Generate random timings and put them into the queue.
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
# Create three worker tasks to process the queue concurrently.
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
# Wait until the queue is fully processed.
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
# Cancel our worker tasks.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())