同期プリミティブ

ソースコード: Lib/asyncio/locks.py


asyncio の同期プリミティブは threading モジュールのそれと類似するようにデザインされていますが、2つの重要な注意事項があります:

  • asyncio の同期プリミティブはスレッドセーフではありません。従って OS スレッドの同期に使うべきではありません (代わりに threading を使ってください);

  • 同期プリミティブのメソッドは timeout 引数を受け付けません; タイムアウトを伴う操作を実行するには asyncio.wait_for() 関数を使ってください。

asyncio モジュールは以下の基本的な同期プリミティブを持っています:


Lock

class asyncio.Lock

asyncio タスクのためのミューテックスロックを実装しています。スレッドセーフではありません。

asyncio ロックは、共有リソースに対する排他的なアクセスを保証するために使うことができます。

Lock の望ましい使用方法は、 async with 文と組み合わせて使うことです:

lock = asyncio.Lock()

# ... later
async with lock:
    # access shared state

これは以下のコードと等価です:

lock = asyncio.Lock()

# ... later
await lock.acquire()
try:
    # access shared state
finally:
    lock.release()

バージョン 3.10 で変更: loop パラメータが削除されました。

coroutine acquire()

ロックを獲得します。

このメソッドはロックが 解除される まで待機し、ロックを ロック状態 に変更して True を返します。

複数のコルーチンが acquire() メソッドによりロックの解除を待ち受けている場合、最終的にただひとつのコルーチンが実行されます。

ロックの獲得は 公平 です: すなわちロックを獲得して実行されるコルーチンは、最初にロックの待ち受けを開始したコルーチンです。

release()

ロックを解放します。

ロックが ロック状態 の場合、 ロックを 解除状態 にしてリターンします。

ロックが 解除状態 の場合、 RuntimeError 例外が送出されます。

locked()

ロック状態 の場合に True を返します。

Event

class asyncio.Event

イベントオブジェクトです。スレッドセーフではありません。

asyncio イベントは、複数の asyncio タスクに対して何らかのイベントが発生したことを通知するために使うことができます。

Event オブジェクトは内部フラグを管理します。フラグの値は set() メソッドにより true に、また clear() メソッドにより false に設定することができます。 wait() メソッドはフラグが true になるまで処理をブロックします。フラグの初期値は false です。

バージョン 3.10 で変更: loop パラメータが削除されました。

以下はプログラム例です:

async def waiter(event):
    print('waiting for it ...')
    await event.wait()
    print('... got it!')

async def main():
    # Create an Event object.
    event = asyncio.Event()

    # Spawn a Task to wait until 'event' is set.
    waiter_task = asyncio.create_task(waiter(event))

    # Sleep for 1 second and set the event.
    await asyncio.sleep(1)
    event.set()

    # Wait until the waiter task is finished.
    await waiter_task

asyncio.run(main())
coroutine wait()

イベントがセットされるまで待機します。

イベントがセットされると、即座に True を返します。 そうでなければ、他のタスクが set() メソッドを呼び出すまで処理をブロックします。

set()

イベントをセットします。

イベントがセットされるまで待機している全てのタスクは、即座に通知を受けて実行を再開します。

clear()

イベントをクリア (アンセット) します

wait() メソッドで待ち受けを行うタスクは set() メソッドが再度呼び出されるまで処理をブロックします。

is_set()

イベントがセットされている場合 True を返します。

Condition

class asyncio.Condition(lock=None)

条件変数オブジェクトです。スレッドセーフではありません。

asyncio 条件プリミティブは何らかのイベントが発生するのを待ち受け、そのイベントを契機として共有リソースへの排他的なアクセスを得るために利用することができます。

本質的に、 Condition オブジェクトは Event と a Lock の2つのクラスの機能を組み合わせたものです。複数の Condition オブジェクトが単一の Lock を共有することでができます。これにより、共有リソースのそれぞれの状態に関連する異なるタスクの間で、そのリソースへの排他的アクセスを調整することが可能になります。

オプション引数 lockLock または None でなければなりません。後者の場合自動的に新しい Lock オブジェクトが生成されます。

バージョン 3.10 で変更: loop パラメータが削除されました。

Condition の望ましい使用方法は async with 文と組み合わせて使うことです:

cond = asyncio.Condition()

# ... later
async with cond:
    await cond.wait()

これは以下のコードと等価です:

cond = asyncio.Condition()

# ... later
await cond.acquire()
try:
    await cond.wait()
finally:
    cond.release()
coroutine acquire()

下層でのロックを獲得します。

このメソッドは下層のロックが 解除される まで待機し、ロックを ロック状態 に変更して True を返します。

notify(n=1)

この条件を待ち受けている最大で n 個のタスク (n のデフォルト値は 1) を起動します。待ち受けているタスクがいない場合、このメソッドは何もしません。

このメソッドが呼び出される前にロックを獲得しておかなければなりません。また、メソッド呼び出し後速やかにロックを解除しなければなりません。 解除された ロックとと共に呼び出された場合、 RuntimeError 例外が送出されます。

locked()

下層のロックを獲得していれば True を返します。

notify_all()

この条件を待ち受けている全てのタスクを起動します。

このメソッドは notify() と同じように振る舞いますが、待ち受けている全てのタスクを起動します。

このメソッドが呼び出される前にロックを獲得しておかなければなりません。また、メソッド呼び出し後速やかにロックを解除しなければなりません。 解除された ロックとと共に呼び出された場合、 RuntimeError 例外が送出されます。

release()

下層のロックを解除します。

アンロック状態のロックに対して呼び出された場合、RuntimeError が送出されます。

coroutine wait()

通知を受けるまで待機します。

このメソッドが呼び出された時点で呼び出し元のタスクがロックを獲得していない場合、 RuntimeError 例外が送出されます。

このメソッドは下層のロックを解除し、その後 notify() または notify_all() の呼び出しによって起動されるまで処理をブロックします。いったん起動されると、 Condition は再びロックを獲得し、メソッドは True を返します。

coroutine wait_for(predicate)

引数 predicate の条件が になるまで待機します。

引数 predicate は戻り値が真偽地として解釈可能な呼び出し可能オブジェクトでなければなりません。 predicate の最終的な値が戻り値になります。

Semaphore

class asyncio.Semaphore(value=1)

セマフォオブジェクトです。スレッドセーフではありません。

セマフォは内部のカウンターを管理しています。カウンターは acquire() メソッドの呼び出しによって減算され、 release() メソッドの呼び出しによって加算されます。カウンターがゼロを下回ることはありません。 acquire() メソッドが呼び出された時にカウンターがゼロになっていると、セマフォは処理をブロックし、他のタスクが release() メソッドを呼び出すまで待機します。

オプション引数 value は内部カウンターの初期値を与えます (デフォルトは 1 です)。 指定された値が 0 より小さい場合、 ValueError 例外が送出されます。

バージョン 3.10 で変更: loop パラメータが削除されました。

セマフォの望ましい使用方法は、 async with 文と組み合わせて使うことです:

sem = asyncio.Semaphore(10)

# ... later
async with sem:
    # work with shared resource

これは以下のコードと等価です:

sem = asyncio.Semaphore(10)

# ... later
await sem.acquire()
try:
    # work with shared resource
finally:
    sem.release()
coroutine acquire()

セマフォを獲得します。

内部カウンターがゼロより大きい場合、カウンターを1つ減算して即座に True を返します。内部カウンターがゼロの場合、 release() が呼び出されるまで待機してから True を返します。

locked()

セマフォを直ちに獲得できない場合 True を返します。

release()

セマフォを解放し、内部カウンターを1つ加算します。セマフォ待ちをしているタスクを起動する可能性があります。

BoundedSemaphore と異なり、 Semaphorerelease()acquire() よりも多く呼び出すことを許容します。

BoundedSemaphore

class asyncio.BoundedSemaphore(value=1)

有限セマフォオブジェクトです。スレッドセーフではありません。

有限セマフォは Semaphore の一種で、 release() メソッドの呼び出しにより内部カウンターが 初期値 よりも増加してしまう場合は ValueError 例外を送出します。

バージョン 3.10 で変更: loop パラメータが削除されました。

Barrier

class asyncio.Barrier(parties)

A barrier object. Not thread-safe.

A barrier is a simple synchronization primitive that allows to block until parties number of tasks are waiting on it. Tasks can wait on the wait() method and would be blocked until the specified number of tasks end up waiting on wait(). At that point all of the waiting tasks would unblock simultaneously.

async with can be used as an alternative to awaiting on wait().

The barrier can be reused any number of times.

以下はプログラム例です:

async def example_barrier():
   # barrier with 3 parties
   b = asyncio.Barrier(3)

   # create 2 new waiting tasks
   asyncio.create_task(b.wait())
   asyncio.create_task(b.wait())

   await asyncio.sleep(0)
   print(b)

   # The third .wait() call passes the barrier
   await b.wait()
   print(b)
   print("barrier passed")

   await asyncio.sleep(0)
   print(b)

asyncio.run(example_barrier())

Result of this example is:

<asyncio.locks.Barrier object at 0x... [filling, waiters:2/3]>
<asyncio.locks.Barrier object at 0x... [draining, waiters:0/3]>
barrier passed
<asyncio.locks.Barrier object at 0x... [filling, waiters:0/3]>

バージョン 3.11 で追加.

coroutine wait()

Pass the barrier. When all the tasks party to the barrier have called this function, they are all unblocked simultaneously.

When a waiting or blocked task in the barrier is cancelled, this task exits the barrier which stays in the same state. If the state of the barrier is "filling", the number of waiting task decreases by 1.

The return value is an integer in the range of 0 to parties-1, different for each task. This can be used to select a task to do some special housekeeping, e.g.:

...
async with barrier as position:
   if position == 0:
      # Only one task prints this
      print('End of *draining phase*')

This method may raise a BrokenBarrierError exception if the barrier is broken or reset while a task is waiting. It could raise a CancelledError if a task is cancelled.

coroutine reset()

Return the barrier to the default, empty state. Any tasks waiting on it will receive the BrokenBarrierError exception.

If a barrier is broken it may be better to just leave it and create a new one.

coroutine abort()

Put the barrier into a broken state. This causes any active or future calls to wait() to fail with the BrokenBarrierError. Use this for example if one of the tasks needs to abort, to avoid infinite waiting tasks.

parties

The number of tasks required to pass the barrier.

n_waiting

The number of tasks currently waiting in the barrier while filling.

broken

バリアが broken な状態である場合に True となるブール値。

exception asyncio.BrokenBarrierError

Barrier オブジェクトがリセットされるか broken な場合に、この例外 (RuntimeError のサブクラス) が送出されます。


バージョン 3.9 で変更: await lockyield from lock およびそれらと with 文との組み合わせ (すなわち with await lockwith (yield from lock)) によるロックの獲得は削除されました。代わりに async with lock を使ってください。