同步化原始物件 (Synchronization Primitives)

原始碼:Lib/asyncio/locks.py


asyncio 的同步化原始物件被設計成和那些 threading 模組 (module) 中的同名物件相似,但有兩個重要的限制條件:

  • asyncio 原始物件並不支援執行緒安全 (thread-safe),因此他們不可被用於 OS 執行緒同步化(請改用 threading);

  • 這些同步化原始物件的方法 (method) 並不接受 timeout 引數;要達成有超時 (timeout) 設定的操作請改用 asyncio.wait_for() 函式。

asyncio 有以下基礎同步化原始物件:


Lock

class asyncio.Lock

實作了一個給 asyncio 任務 (task) 用的互斥鎖 (mutex lock)。不支援執行緒安全。

一個 asyncio 的鎖可以用來確保一個共享資源的存取權被獨佔。

使用 Lock 的推薦方式是透過 async with 陳述式:

lock = asyncio.Lock()

# ... later
async with lock:
    # access shared state

這等價於:

lock = asyncio.Lock()

# ... later
await lock.acquire()
try:
    # access shared state
finally:
    lock.release()

在 3.10 版的變更: 移除 loop 參數。

coroutine acquire()

獲得鎖。

此方法會持續等待直到鎖的狀態成為 unlocked,並將其設置為 locked 和回傳 True

當多於一個的協程 (coroutine) 在 acquire() 中等待解鎖而被阻塞,最終只會有其中的一個被處理。

鎖的獲取方式是公平的:被處理的協程會是最早開始等待解鎖的那一個。

release()

釋放鎖。

如果鎖的狀態為 locked 則將其重置為 unlocked 並回傳。

如果鎖的狀態為 unlockedRuntimeError 會被引發。

locked()

如果鎖的狀態為 locked 則回傳 True

Event

class asyncio.Event

一個事件 (event) 物件。不支援執行緒安全。

一個 asyncio 事件可以被用於通知多個有發生某些事件於其中的 asyncio 任務。

一個 Event 物件會管理一個內部旗標 (flag),它可以透過 set() 方法來被設為 true 並透過 clear() 方法來重置為 falsewait() 方法會被阻塞 (block) 直到該旗標被設為 true。該旗標初始設置為 false

在 3.10 版的變更: 移除 loop 參數。

範例:

async def waiter(event):
    print('waiting for it ...')
    await event.wait()
    print('... got it!')

async def main():
    # Create an Event object.
    event = asyncio.Event()

    # Spawn a Task to wait until 'event' is set.
    waiter_task = asyncio.create_task(waiter(event))

    # Sleep for 1 second and set the event.
    await asyncio.sleep(1)
    event.set()

    # Wait until the waiter task is finished.
    await waiter_task

asyncio.run(main())
coroutine wait()

持續等待直到事件被設置。

如果事件有被設置則立刻回傳 True。否則持續阻塞直到另一個任務呼叫 set()

set()

設置事件。

所有正在等待事件被設置的任務會立即被喚醒。

clear()

清除(還原)事件。

正透過 wait() 等待的 Tasks 現在會持續阻塞直到 set() 方法再次被呼叫。

is_set()

如果事件有被設置則回傳 True

Condition

class asyncio.Condition(lock=None)

一個條件 (codition) 物件。不支援執行緒安全。

一個 asyncio 條件原始物件可以被任務用來等待某事件發生,並獲得一個共享資源的獨佔存取權。

本質上,一個 Condition 物件會結合 EventLock 的功能。多個 Condition 物件共享一個 Lock 是有可能發生的,這能夠協調關注同一共享資源的不同狀態以獲取其獨佔存取權的多個任務。

可選的 lock 引數必須是一個 Lock 物件或者為 None。如為後者則一個新的 Lock 物件會被自動建立。

在 3.10 版的變更: 移除 loop 參數。

使用 Condition 的推薦方式是透過 async with 陳述式:

cond = asyncio.Condition()

# ... later
async with cond:
    await cond.wait()

這等價於:

cond = asyncio.Condition()

# ... later
await cond.acquire()
try:
    await cond.wait()
finally:
    cond.release()
coroutine acquire()

獲取底層的鎖。

此方法會持續等待直到底層的鎖為 unlocked,並將其設為 locked 並回傳 True

notify(n=1)

喚醒至多 n 個正在等待此條件的任務(預設為 1),如果沒有正在等待的任務則此方法為空操作 (no-op)。

在此方法被呼叫前必須先獲得鎖,並在之後立刻將其釋放。如果呼叫於一個 unlocked 的鎖則 RuntimeError 錯誤會被引發。

locked()

如果已獲取底層的鎖則回傳 True

notify_all()

喚醒所有正在等待此條件的任務。

這個方法的行為就像 notify(),但會喚醒所有正在等待的任務。

在此方法被呼叫前必須先獲得鎖,並在之後立刻將其釋放。如果呼叫於一個 unlocked 的鎖則 RuntimeError 錯誤會被引發。

release()

釋放底層的鎖。

當調用於一個未被解開的鎖之上時,會引發一個 RuntimeError

coroutine wait()

持續等待直到被通知 (notify)。

當此方法被呼叫時,如果呼叫它的任務還沒有獲取鎖的話,RuntimeError 會被引發。

此方法會釋放底層的鎖,然後持續阻塞直到被 notify()notify_all() 的呼叫所喚醒。一但被喚醒,Condition 會重新獲取該鎖且此方法會回傳 True

coroutine wait_for(predicate)

持續等待直到謂語 (predicate) 成為 true

謂語必須是一個結果可被直譯為一個 boolean 值的可呼叫物件 (callable)。最終值為回傳值。

Semaphore

class asyncio.Semaphore(value=1)

一個旗號 (semaphore) 物件。不支援執行緒安全。

一個旗號物件會管理一個內部計數器,會在每次呼叫 acquire() 時減少一、每次呼叫 release() 時增加一。此計數器永遠不會少於零;當 acquire() 發現它是零時,它會持續阻塞並等待某任務呼叫 release()

可選的 value 引數給定了內部計數器的初始值(預設為 1)。如給定的值少於 0ValueError 會被引發。

在 3.10 版的變更: 移除 loop 參數。

使用 Semaphore 的推薦方式是透過 async with 陳述式:

sem = asyncio.Semaphore(10)

# ... later
async with sem:
    # work with shared resource

這等價於:

sem = asyncio.Semaphore(10)

# ... later
await sem.acquire()
try:
    # work with shared resource
finally:
    sem.release()
coroutine acquire()

獲取一個旗號。

如果內部計數器大於零,將其減一並立刻回傳 True。如果為零,則持續等待直到 release() 被呼叫,並回傳 True

locked()

如果旗號無法立即被取得則回傳 True

release()

釋放一個旗號,並為其內部的計數器數值增加一。可以把一個正在等待獲取旗號的任務叫醒。

BoundedSemaphore 不同,Semaphore 允許 release() 的呼叫次數多於 acquire()

BoundedSemaphore

class asyncio.BoundedSemaphore(value=1)

一個有界的旗號物件。不支援執行緒安全。

Bounded Semaphore 是 Semaphore 的另一版本,如果其內部的計數器數值增加至大於初始 value 值的話,ValueError 會在 release() 時被引發。

在 3.10 版的變更: 移除 loop 參數。

Barrier

class asyncio.Barrier(parties)

一個屏障 (barrier) 物件。不支援執行緒安全。

屏障是一个允许阻塞直到 parties 个任务在其上等待的简单同步原语。 任务可以在 wait() 方法上等待并将被阻塞直到有指定数量的任务在 wait() 上等待。 在那时所有正在等待的任务将同时撤销阻塞。

async with 可以被用作在 wait() 上等待的替代。

屏障可被重复使用任意次数。

範例:

async def example_barrier():
   # barrier with 3 parties
   b = asyncio.Barrier(3)

   # create 2 new waiting tasks
   asyncio.create_task(b.wait())
   asyncio.create_task(b.wait())

   await asyncio.sleep(0)
   print(b)

   # The third .wait() call passes the barrier
   await b.wait()
   print(b)
   print("barrier passed")

   await asyncio.sleep(0)
   print(b)

asyncio.run(example_barrier())

该示例的结果为:

<asyncio.locks.Barrier object at 0x... [filling, waiters:2/3]>
<asyncio.locks.Barrier object at 0x... [draining, waiters:0/3]>
barrier passed
<asyncio.locks.Barrier object at 0x... [filling, waiters:0/3]>

在 3.11 版新加入.

coroutine wait()

穿过屏障。 当屏障汇集的所有任务都调用了此函数时,它们将被同时撤销阻塞。

当该屏障中等待或阻塞的任务被取消时,此任务将退出屏障而屏障将保持相同状态。 如果屏障的状态为 "正在填充",则等待的任务数量将减 1。

返回值是一个 0 到 parties-1 之间的整数,对于每个任务来说各不相同。 这可被用来选择一个任务以执行某些特别的操作。 例如:

...
async with barrier as position:
   if position == 0:
      # Only one task prints this
      print('End of *draining phase*')

如果屏障在有任务在等待时已被破坏或重置则此方法可能会引发 BrokenBarrierError。 如果有任务被取消则它可能会引发 CancelledError

coroutine reset()

将屏障返回为默认的空白状态。 任何正在其上等待的任务将会接收到 BrokenBarrierError 异常。

如果屏障已被破坏则最好的是让其保持原状并创建一个新的屏障。

coroutine abort()

使屏障处于已破坏状态。 这会导致任何现有和未来对 wait() 的调用失败并引发 BrokenBarrierError。 例如可以在需要中止某个任务时使用此方法,以避免任务无限等待。

parties

请求穿过该屏障的任务的数量。

n_waiting

当执行填充时正在屏障中等待的任务的数量。

broken

一个布尔值,值为 True 表明栅栏为破损态。

exception asyncio.BrokenBarrierError

异常类,是 RuntimeError 异常的子类,在 Barrier 对象重置时仍有线程阻塞时和对象进入破损态时被引发。


在 3.9 版的變更: 透過 await lockyield from lock 和/或 with 陳述式 (with await lock, with (yield from lock)) 來獲取鎖的方式已被移除。請改用 async with lock