Files d'attente (queues)¶
Code source : Lib/asyncore.py
Les files d'attente asyncio sont conçues pour être similaires aux classes du module queue
. Bien que les files d'attente asyncio ne soient pas compatibles avec les programmes à multiples fils d'exécution, elles sont conçues pour être utilisées spécifiquement dans le code async/await.
Notez que les méthodes des files d'attente asyncio n'ont pas de paramètre timeout ; utilisez la fonction asyncio.wait_for()
pour effectuer des opérations de file d'attente avec un délai d'attente.
Voir également la section Exemples ci-dessous.
File d'attente (queue)¶
- class asyncio.Queue(maxsize=0)¶
File d'attente premier entré, premier sorti (FIFO pour fist in, first out).
Si maxsize est inférieur ou égal à zéro, la taille de la file d'attente est infinie. Si c'est un entier supérieur à
0
, alorsawait put()
se bloque lorsque la file d'attente atteint maxsize jusqu'à ce qu'un élément soit supprimé parget()
.Contrairement à la bibliothèque standard multi-fils
queue
, la taille de la file d'attente est toujours connue et peut être renvoyée en appelant la méthodeqsize()
.Modifié dans la version 3.10: suppression du paramètre loop.
Cette classe n'est pas compatible avec les fils d'exécution multiples.
- maxsize¶
Nombre d'éléments autorisés dans la file d'attente.
- empty()¶
Renvoie
True
si la file d'attente est vide,False
sinon.
- full()¶
Renvoie
True
s'il y amaxsize
éléments dans la file d'attente.If the queue was initialized with
maxsize=0
(the default), thenfull()
never returnsTrue
.
- coroutine get()¶
Supprime et renvoie un élément de la file d'attente. Si la file d'attente est vide, attend qu'un élément soit disponible.
- get_nowait()¶
Renvoie un élément s'il y en a un immédiatement disponible, sinon lève
QueueEmpty
.
- coroutine join()¶
Bloque jusqu'à ce que tous les éléments de la file d'attente aient été récupérés et traités.
Le nombre de tâches inachevées augmente chaque fois qu'un élément est ajouté à la file. Ce nombre diminue chaque fois qu'un fil d'exécution consommateur appelle
task_done()
pour indiquer que l'élément a été extrait et que tout le travail à effectuer dessus est terminé. Lorsque le nombre de tâches non terminées devient nul,join()
débloque.
- coroutine put(item)¶
Met un élément dans la file d'attente. Si la file d'attente est pleine, attend qu'un emplacement libre soit disponible avant d'ajouter l'élément.
- put_nowait(item)¶
Ajoute un élément dans la file d'attente sans bloquer.
Si aucun emplacement libre n'est immédiatement disponible, lève
QueueFull
.
- qsize()¶
Renvoie le nombre d'éléments dans la file d'attente.
- task_done()¶
Indicate that a formerly enqueued work item is complete.
Used by queue consumers. For each
get()
used to fetch a work item, a subsequent call totask_done()
tells the queue that the processing on the work item is complete.Si un
join()
est actuellement bloquant, on reprendra lorsque tous les éléments auront été traités (ce qui signifie qu'un appel àtask_done()
a été effectué pour chaque élément qui a étéput()
dans la file).Lève une exception
ValueError
si elle est appelée plus de fois qu'il n'y avait d'éléments dans la file.
File avec priorité¶
Pile (LIFO)¶
Exceptions¶
- exception asyncio.QueueEmpty¶
Cette exception est levée lorsque la méthode
get_nowait()
est appelée sur une file d'attente vide.
- exception asyncio.QueueFull¶
Exception levée lorsque la méthode
put_nowait()
est appelée sur une file d'attente qui a atteint sa maxsize.
Exemples¶
Les files d'attente peuvent être utilisées pour répartir la charge de travail entre plusieurs tâches simultanées :
import asyncio
import random
import time
async def worker(name, queue):
while True:
# Get a "work item" out of the queue.
sleep_for = await queue.get()
# Sleep for the "sleep_for" seconds.
await asyncio.sleep(sleep_for)
# Notify the queue that the "work item" has been processed.
queue.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
async def main():
# Create a queue that we will use to store our "workload".
queue = asyncio.Queue()
# Generate random timings and put them into the queue.
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
# Create three worker tasks to process the queue concurrently.
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
# Wait until the queue is fully processed.
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
# Cancel our worker tasks.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())