Примітиви синхронізації

Вихідний код: Lib/asyncio/locks.py


Примітиви синхронізації asyncio розроблені таким чином, щоб бути подібними до модулів threading з двома важливими застереженнями:

  • примітиви asyncio не є потокобезпечними, тому їх не слід використовувати для синхронізації потоків ОС (для цього використовуйте threading);

  • методи цих примітивів синхронізації не приймають аргумент timeout; використовуйте функцію asyncio.wait_for() для виконання операцій із тайм-аутами.

asyncio має такі базові примітиви синхронізації:


Замок

class asyncio.Lock

Реалізує блокування м’ютексу для асинхронних завдань. Небезпечно для потоків.

Асинхронне блокування можна використовувати, щоб гарантувати ексклюзивний доступ до спільного ресурсу.

Найкращим способом використання Lock є оператор async with:

lock = asyncio.Lock()

# ... later
async with lock:
    # access shared state

що еквівалентно:

lock = asyncio.Lock()

# ... later
await lock.acquire()
try:
    # access shared state
finally:
    lock.release()

Змінено в версії 3.10: Видалено параметр loop.

coroutine acquire()

Придбайте замок.

Цей метод очікує, поки блокування буде розблоковано, встановлює його на заблоковано та повертає True.

Якщо в acquire() заблоковано більше ніж одну співпрограму, яка очікує розблокування блокування, зрештою виконується лише одна співпрограма.

Отримання блокування є чесним: співпрограма, яка продовжується, буде першою співпрограмою, яка почала очікувати на блокування.

release()

Відпустіть замок.

Коли замок заблоковано, скиньте його на розблоковано та поверніться.

Якщо блокування розблоковано, виникає RuntimeError.

locked()

Повертає True, якщо блокування заблоковано.

Подія

class asyncio.Event

Об’єкт події. Небезпечно для потоків.

Асинхронну подію можна використовувати для сповіщення кількох асинхронних завдань про те, що сталася якась подія.

Об’єкт Event керує внутрішнім прапором, який можна встановити на true за допомогою методу set() і скинути на false за допомогою методу clear(). Метод wait() блокується, доки прапор не буде встановлено на true. Спочатку для прапора встановлено значення false.

Змінено в версії 3.10: Видалено параметр loop.

Приклад:

async def waiter(event):
    print('waiting for it ...')
    await event.wait()
    print('... got it!')

async def main():
    # Create an Event object.
    event = asyncio.Event()

    # Spawn a Task to wait until 'event' is set.
    waiter_task = asyncio.create_task(waiter(event))

    # Sleep for 1 second and set the event.
    await asyncio.sleep(1)
    event.set()

    # Wait until the waiter task is finished.
    await waiter_task

asyncio.run(main())
coroutine wait()

Дочекайтеся встановлення події.

Якщо подія встановлена, негайно поверніть True. Інакше заблокуйте, доки інше завдання не викличе set().

set()

Встановити подію.

Усі завдання, які очікують встановлення події, будуть негайно активовані.

clear()

Очистити (скасувати) подію.

Завдання, що очікують на wait(), тепер блокуватимуться, доки метод set() не буде викликано знову.

is_set()

Повертає True, якщо подія встановлена.

Хвороба

class asyncio.Condition(lock=None)

Об’єкт Condition. Небезпечно для потоків.

Примітив асинхронної умови може використовуватися завданням для очікування настання певної події та отримання ексклюзивного доступу до спільного ресурсу.

По суті, об’єкт Condition поєднує функціональні можливості Event і Lock. Кілька об’єктів Condition можуть використовувати один Lock, що дозволяє координувати ексклюзивний доступ до спільного ресурсу між різними завданнями, зацікавленими в певних станах цього спільного ресурсу.

Додатковий аргумент lock має бути об’єктом Lock або None. В останньому випадку новий об’єкт Lock створюється автоматично.

Змінено в версії 3.10: Видалено параметр loop.

Найкращим способом використання умови є оператор async with:

cond = asyncio.Condition()

# ... later
async with cond:
    await cond.wait()

що еквівалентно:

cond = asyncio.Condition()

# ... later
await cond.acquire()
try:
    await cond.wait()
finally:
    cond.release()
coroutine acquire()

Отримайте основний замок.

Цей метод чекає, доки базове блокування розблокується, встановлює його на locked і повертає True.

notify(n=1)

Wake up at most n tasks (1 by default) waiting on this condition. The method is no-op if no tasks are waiting.

Блокування має бути отримано перед викликом цього методу та звільнено незабаром після цього. У разі виклику з розблокованим блокуванням виникає помилка RuntimeError.

locked()

Повертає True, якщо основне блокування отримано.

notify_all()

Розбудіть усі завдання, що очікують за цієї умови.

Цей метод діє як notify(), але активує всі завдання, що очікують.

Блокування має бути отримано перед викликом цього методу та звільнено незабаром після цього. У разі виклику з розблокованим блокуванням виникає помилка RuntimeError.

release()

Звільніть базовий замок.

Під час виклику для розблокованого блокування виникає RuntimeError.

coroutine wait()

Дочекайтеся повідомлення.

Якщо завдання, що викликає, не отримало блокування під час виклику цього методу, виникає RuntimeError.

Цей метод знімає основне блокування, а потім блокує, доки його не розбудить виклик notify() або notify_all(). Після пробудження умова знову блокується, і цей метод повертає True.

coroutine wait_for(predicate)

Зачекайте, поки предикат стане true.

The predicate must be a callable which result will be interpreted as a boolean value. The final value is the return value.

Семафор

class asyncio.Semaphore(value=1)

Об’єкт Semaphore. Небезпечно для потоків.

Семафор керує внутрішнім лічильником, який зменшується при кожному виклику acquire() і збільшується при кожному виклику release(). Лічильник ніколи не може опускатися нижче нуля; коли acquire() виявляє, що він дорівнює нулю, він блокується, чекаючи, поки якесь завдання викличе release().

Необов’язковий аргумент value дає початкове значення для внутрішнього лічильника (1 за замовчуванням). Якщо вказане значення менше ніж 0, виникає ValueError.

Змінено в версії 3.10: Видалено параметр loop.

Кращим способом використання семафора є оператор async with:

sem = asyncio.Semaphore(10)

# ... later
async with sem:
    # work with shared resource

що еквівалентно:

sem = asyncio.Semaphore(10)

# ... later
await sem.acquire()
try:
    # work with shared resource
finally:
    sem.release()
coroutine acquire()

Придбайте семафор.

Якщо внутрішній лічильник більший за нуль, зменште його на одиницю та негайно поверніть True. Якщо він дорівнює нулю, дочекайтеся виклику release() і поверніть True.

locked()

Повертає True, якщо семафор не може бути отриманий негайно.

release()

Відпустіть семафор, збільшивши внутрішній лічильник на одиницю. Може розбудити завдання, яке очікує отримання семафора.

На відміну від BoundedSemaphore, Semaphore дозволяє робити більше викликів release(), ніж викликів acquire().

Обмежений семафор

class asyncio.BoundedSemaphore(value=1)

Обмежений семафорний об’єкт. Небезпечно для потоків.

Обмежений семафор — це версія Semaphore, яка викликає ValueError у release(), якщо він збільшує внутрішній лічильник вище початкового значення.

Змінено в версії 3.10: Видалено параметр loop.

Barrier

class asyncio.Barrier(parties)

A barrier object. Not thread-safe.

A barrier is a simple synchronization primitive that allows to block until parties number of tasks are waiting on it. Tasks can wait on the wait() method and would be blocked until the specified number of tasks end up waiting on wait(). At that point all of the waiting tasks would unblock simultaneously.

async with can be used as an alternative to awaiting on wait().

The barrier can be reused any number of times.

Приклад:

async def example_barrier():
   # barrier with 3 parties
   b = asyncio.Barrier(3)

   # create 2 new waiting tasks
   asyncio.create_task(b.wait())
   asyncio.create_task(b.wait())

   await asyncio.sleep(0)
   print(b)

   # The third .wait() call passes the barrier
   await b.wait()
   print(b)
   print("barrier passed")

   await asyncio.sleep(0)
   print(b)

asyncio.run(example_barrier())

Result of this example is:

<asyncio.locks.Barrier object at 0x... [filling, waiters:2/3]>
<asyncio.locks.Barrier object at 0x... [draining, waiters:0/3]>
barrier passed
<asyncio.locks.Barrier object at 0x... [filling, waiters:0/3]>

Added in version 3.11.

coroutine wait()

Pass the barrier. When all the tasks party to the barrier have called this function, they are all unblocked simultaneously.

When a waiting or blocked task in the barrier is cancelled, this task exits the barrier which stays in the same state. If the state of the barrier is «filling», the number of waiting task decreases by 1.

The return value is an integer in the range of 0 to parties-1, different for each task. This can be used to select a task to do some special housekeeping, e.g.:

...
async with barrier as position:
   if position == 0:
      # Only one task prints this
      print('End of *draining phase*')

This method may raise a BrokenBarrierError exception if the barrier is broken or reset while a task is waiting. It could raise a CancelledError if a task is cancelled.

coroutine reset()

Return the barrier to the default, empty state. Any tasks waiting on it will receive the BrokenBarrierError exception.

If a barrier is broken it may be better to just leave it and create a new one.

coroutine abort()

Put the barrier into a broken state. This causes any active or future calls to wait() to fail with the BrokenBarrierError. Use this for example if one of the tasks needs to abort, to avoid infinite waiting tasks.

parties

The number of tasks required to pass the barrier.

n_waiting

The number of tasks currently waiting in the barrier while filling.

broken

Логічне значення, яке має значення True, якщо бар’єр знаходиться в зламаному стані.

exception asyncio.BrokenBarrierError

Цей виняток, підклас RuntimeError, виникає, коли об’єкт Barrier скидається або зламано.


Змінено в версії 3.9: Отримання блокування за допомогою оператора await lock або yield from lock та/або with (with await lock, with (yield from lock)) було видалено . Натомість використовуйте async with lock.