Primitives de synchronisation

Code source : Lib/asyncio/locks.py


Les primitives de synchronisation asyncio sont conçues pour être similaires à celles du module threading avec deux mises en garde importantes :

  • les primitives asyncio ne sont pas thread-safe, elles ne doivent donc pas être utilisées pour la synchronisation des fils d'exécution du système d'exploitation (utilisez threading pour cela) ;

  • les méthodes de ces primitives de synchronisation n'acceptent pas l'argument timeout ; utilisez la fonction asyncio.wait_for() pour effectuer des opérations avec des délais d'attente.

asyncio possède les primitives de synchronisation de base suivantes :


Verrou (lock)

class asyncio.Lock

Implémente un verrou exclusif (mutex) pour les tâches asynchrones. Ce n'est pas compatible avec les programmes à fils d'exécution multiples.

Un verrou asyncio peut être utilisé pour garantir un accès exclusif à une ressource partagée.

La meilleure façon d'utiliser un verrou est une instruction async with

lock = asyncio.Lock()

# ... later
async with lock:
    # access shared state

ce qui équivaut à :

lock = asyncio.Lock()

# ... later
await lock.acquire()
try:
    # access shared state
finally:
    lock.release()

Modifié dans la version 3.10: suppression du paramètre loop.

coroutine acquire()

Verrouille (ou acquiert) le verrou.

Cette méthode attend que le verrou soit déverrouillé (unlocked), le verrouille (positionné sur locked) et renvoie True.

Lorsque plus d'une coroutine est bloquée dans acquire() en attendant que le verrou soit déverrouillé, seule une coroutine continue finalement.

L'acquisition d'un verrou est équitable : la coroutine qui acquiert le verrou est celle qui était la première à attendre le verrou.

release()

Libère le verrou.

Lorsque le verrou est verrouillé, le déverrouille et termine.

Si le verrou est déjà déverrouillé, une RuntimeError est levée.

locked()

Renvoie True si le verrou est verrouillé.

Événement (Event)

class asyncio.Event

Objet événement. Non compatible avec les programmes à plusieurs fils d'exécution.

Un événement asynchrone peut être utilisé pour notifier plusieurs tâches asynchrones qu'un événement s'est produit.

Un objet Event gère un drapeau interne qui peut être activé (ou mis à vrai) avec la méthode set() et désactivé (ou mis à faux) avec la méthode clear(). La méthode wait() se bloque jusqu'à ce que l'indicateur soit activé. L'indicateur est initialement désactivé.

Modifié dans la version 3.10: suppression du paramètre loop.

Exemple :

async def waiter(event):
    print('waiting for it ...')
    await event.wait()
    print('... got it!')

async def main():
    # Create an Event object.
    event = asyncio.Event()

    # Spawn a Task to wait until 'event' is set.
    waiter_task = asyncio.create_task(waiter(event))

    # Sleep for 1 second and set the event.
    await asyncio.sleep(1)
    event.set()

    # Wait until the waiter task is finished.
    await waiter_task

asyncio.run(main())
coroutine wait()

Attend que l'évènement soit activé.

Si l'événement est activé (vrai), renvoie True immédiatement. Sinon bloque jusqu'à ce qu'une autre tâche appelle set().

set()

Active l'événement.

Toutes les tâches en attente de l'événement sont immédiatement réveillées.

clear()

Efface (désactive) l'événement.

Les tâches en attente sur wait() seront désormais bloquées jusqu'à ce que la méthode set() soit à nouveau appelée.

is_set()

Renvoie True si l'évènement est actif.

Condition

class asyncio.Condition(lock=None)

Objet Condition. Non compatible avec les programmes à plusieurs fils d'exécution.

Une primitive de condition asynchrone peut être utilisée par une tâche pour attendre qu'un événement se produise, puis obtenir un accès exclusif à une ressource partagée.

Essentiellement, un objet Condition combine les fonctionnalités d'un Event et d'un Lock. Il est possible que plusieurs objets Condition partagent un seul verrou, ce qui permet de coordonner l'accès exclusif à une ressource partagée entre différentes tâches intéressées par des états particuliers de cette ressource partagée.

L'argument optionnel lock doit être un objet Lock ou None. Dans ce dernier cas, un nouvel objet Lock est créé automatiquement.

Modifié dans la version 3.10: suppression du paramètre loop.

La meilleure façon d'utiliser une Condition est une instruction async with

cond = asyncio.Condition()

# ... later
async with cond:
    await cond.wait()

ce qui équivaut à :

cond = asyncio.Condition()

# ... later
await cond.acquire()
try:
    await cond.wait()
finally:
    cond.release()
coroutine acquire()

Verrouille le verrou sous-jacent.

Cette méthode attend que le verrou sous-jacent soit déverrouillé, le verrouille et renvoie True.

notify(n=1)

Réveille au plus n tâches (1 par défaut) en attente de cette condition. La méthode ne fait rien si aucune tâche n'est en attente.

Le verrou doit être verrouillé avant que cette méthode ne soit appelée et libéré peu de temps après. S'il est appelé avec un verrou déverrouillé, une erreur RuntimeError est levée.

locked()

Renvoie True si le verrou sous-jacent est verrouillé.

notify_all()

Réveille toutes les tâches en attente sur cette condition.

Cette méthode agit comme notify(), mais réveille toutes les tâches en attente.

Le verrou doit être verrouillé avant que cette méthode ne soit appelée et libéré peu de temps après. S'il est appelé avec un verrou déverrouillé, une erreur RuntimeError est levée.

release()

Libère le verrou sous-jacent.

Lorsqu'elle est invoquée sur un verrou déverrouillé, une RuntimeError est levée.

coroutine wait()

Attend d'être notifié.

Si la tâche appelante n'a pas verrouillé le verrou lorsque cette méthode est appelée, une RuntimeError est levée.

Cette méthode libère le verrou sous-jacent, puis se bloque jusqu'à ce qu'elle soit réveillée par un appel notify() ou notify_all(). Une fois réveillée, la Condition verrouille à nouveau son verrou et cette méthode renvoie True.

coroutine wait_for(predicate)

Attend jusqu'à ce qu'un prédicat devienne vrai.

The predicate must be a callable which result will be interpreted as a boolean value. The method will repeatedly wait() until the predicate evaluates to true. The final value is the return value.

Sémaphore

class asyncio.Semaphore(value=1)

Objet Sémaphore. Non compatible avec les programmes à plusieurs fils d'exécution.

Un sémaphore gère un compteur interne qui est décrémenté à chaque appel acquire() et incrémenté à chaque appel release(). Le compteur ne peut jamais descendre en dessous de zéro ; quand acquire() trouve qu'il est égal à zéro, il se bloque, en attendant qu'une tâche appelle release().

L'argument optionnel value donne la valeur initiale du compteur interne (1 par défaut). Si la valeur donnée est inférieure à 0 une ValueError est levée.

Modifié dans la version 3.10: suppression du paramètre loop.

La meilleure façon d'utiliser un sémaphore est une instruction async with

sem = asyncio.Semaphore(10)

# ... later
async with sem:
    # work with shared resource

ce qui équivaut à :

sem = asyncio.Semaphore(10)

# ... later
await sem.acquire()
try:
    # work with shared resource
finally:
    sem.release()
coroutine acquire()

Acquiert un sémaphore.

Si le compteur interne est supérieur à zéro, le décrémente d'une unité et renvoie True immédiatement. Si c'est zéro, attend que release() soit appelée et renvoie True.

locked()

Renvoie True si le sémaphore ne peut pas être acquis immédiatement.

release()

Relâche un sémaphore, incrémentant le compteur interne d'une unité. Peut réveiller une tâche en attente d'acquisition du sémaphore.

Contrairement à BoundedSemaphore, Semaphore permet de faire plus d'appels release() que d'appels acquire().

Sémaphore capé (BoundedSemaphore)

class asyncio.BoundedSemaphore(value=1)

Objet sémaphore capé. Non compatible avec les programmes à plusieurs fils d'exécution.

Bounded Semaphore est une version de Semaphore qui lève une ValueError dans release() s'il augmente le compteur interne au-dessus de la value initiale.

Modifié dans la version 3.10: suppression du paramètre loop.

Barrière (Barrier)

class asyncio.Barrier(parties)

Objet barrière. Non compatible avec les programmes à plusieurs fils d'exécution.

Une barrière est une simple primitive de synchronisation qui permet de bloquer jusqu'à ce que parties tâches l'attendent. Les tâches attendent sur la méthode wait() et sont bloquées jusqu'à ce que le nombre spécifié de tâches attendent sur wait(). À ce stade, toutes les tâches en attente se débloquent simultanément.

async with peut être utilisé comme alternative à l'attente sur wait().

La barrière peut être réutilisée un nombre illimité de fois.

Exemple :

async def example_barrier():
   # barrier with 3 parties
   b = asyncio.Barrier(3)

   # create 2 new waiting tasks
   asyncio.create_task(b.wait())
   asyncio.create_task(b.wait())

   await asyncio.sleep(0)
   print(b)

   # The third .wait() call passes the barrier
   await b.wait()
   print(b)
   print("barrier passed")

   await asyncio.sleep(0)
   print(b)

asyncio.run(example_barrier())

Le résultat de cet exemple est :

<asyncio.locks.Barrier object at 0x... [filling, waiters:2/3]>
<asyncio.locks.Barrier object at 0x... [draining, waiters:0/3]>
barrier passed
<asyncio.locks.Barrier object at 0x... [filling, waiters:0/3]>

Ajouté dans la version 3.11.

coroutine wait()

Passe la barrière. Lorsque toutes les tâches bloquées à la barrière ont appelé cette fonction, elles sont toutes débloquées simultanément.

Lorsqu'une tâche en attente ou bloquée à la barrière est annulée, cette tâche sort de la barrière qui reste dans le même état. Si la barrière est en cours de « remplissage », le nombre de tâche en attente diminue de 1.

La valeur de retour est un entier compris entre 0 et parties-1, différent pour chaque tâche. Cela peut être utilisé pour sélectionner une tâche qui fera du ménage, par exemple :

...
async with barrier as position:
   if position == 0:
      # Only one task prints this
      print('End of *draining phase*')

Cette méthode peut lever une exception BrokenBarrierError si la barrière est brisée ou réinitialisée alors qu'une tâche est en attente. Cela peut lever une CancelledError si une tâche est annulée.

coroutine reset()

Ramène la barrière à l'état vide par défaut. Toutes les tâches en attente reçoivent l'exception BrokenBarrierError.

Si une barrière est brisée, il peut être préférable de la quitter et d'en créer une nouvelle.

coroutine abort()

Put the barrier into a broken state. This causes any active or future calls to wait() to fail with the BrokenBarrierError. Use this for example if one of the tasks needs to abort, to avoid infinite waiting tasks.

parties

Le nombre de tâches nécessaires pour franchir la barrière.

n_waiting

Le nombre de tâches actuellement en attente à la barrière pendant le remplissage.

broken

Booléen qui vaut True si la barrière est rompue.

exception asyncio.BrokenBarrierError

Cette exception, une sous-classe de RuntimeError, est déclenchée lorsque l'objet Barrier est réinitialisé ou cassé.


Modifié dans la version 3.9: l'acquisition d'un verrou en utilisant wait lock ou yield from lock ou with (with await lock, with (yield from lock)) a été supprimée. Utilisez async with lock à la place.