同步化原始物件 (Synchronization Primitives)¶
asyncio 的同步化原始物件被設計成和那些 threading
模組 (module) 中的同名物件相似,但有兩個重要的限制條件:
asyncio 原始物件並不支援執行緒安全 (thread-safe),因此他們不可被用於 OS 執行緒同步化(請改用
threading
);這些同步化原始物件的方法 (method) 並不接受 timeout 引數;要達成有超時 (timeout) 設定的操作請改用
asyncio.wait_for()
函式。
asyncio 有以下基礎同步化原始物件:
Lock¶
- class asyncio.Lock¶
實作了一個給 asyncio 任務 (task) 用的互斥鎖 (mutex lock)。不支援執行緒安全。
一個 asyncio 的鎖可以用來確保一個共享資源的存取權被獨佔。
使用 Lock 的推薦方式是透過
async with
陳述式:lock = asyncio.Lock() # ... later async with lock: # access shared state
這等價於:
lock = asyncio.Lock() # ... later await lock.acquire() try: # access shared state finally: lock.release()
在 3.10 版的變更: 移除
loop
參數。- coroutine acquire()¶
獲得鎖。
此方法會持續等待直到鎖的狀態成為 unlocked,並將其設置為 locked 和回傳
True
。當多於一個的協程 (coroutine) 在
acquire()
中等待解鎖而被阻塞,最終只會有其中的一個被處理。鎖的獲取方式是公平的:被處理的協程會是最早開始等待解鎖的那一個。
- release()¶
釋放鎖。
如果鎖的狀態為 locked 則將其重置為 unlocked 並回傳。
如果鎖的狀態為 unlocked 則
RuntimeError
會被引發。
- locked()¶
如果鎖的狀態為 locked 則回傳
True
。
Event¶
- class asyncio.Event¶
一個事件 (event) 物件。不支援執行緒安全。
一個 asyncio 事件可以被用於通知多個有發生某些事件於其中的 asyncio 任務。
一個 Event 物件會管理一個內部旗標 (flag),它可以透過
set()
方法來被設為 true 並透過clear()
方法來重置為 false。wait()
方法會被阻塞 (block) 直到該旗標被設為 true。該旗標初始設置為 false。在 3.10 版的變更: 移除
loop
參數。範例:
async def waiter(event): print('waiting for it ...') await event.wait() print('... got it!') async def main(): # Create an Event object. event = asyncio.Event() # Spawn a Task to wait until 'event' is set. waiter_task = asyncio.create_task(waiter(event)) # Sleep for 1 second and set the event. await asyncio.sleep(1) event.set() # Wait until the waiter task is finished. await waiter_task asyncio.run(main())
- set()¶
設置事件。
所有正在等待事件被設置的任務會立即被喚醒。
- is_set()¶
如果事件有被設置則回傳
True
。
Condition¶
- class asyncio.Condition(lock=None)¶
一個條件 (codition) 物件。不支援執行緒安全。
一個 asyncio 條件原始物件可以被任務用來等待某事件發生,並獲得一個共享資源的獨佔存取權。
本質上,一個 Condition 物件會結合
Event
和Lock
的功能。多個 Condition 物件共享一個 Lock 是有可能發生的,這能夠協調關注同一共享資源的不同狀態以獲取其獨佔存取權的多個任務。可選的 lock 引數必須是一個
Lock
物件或者為None
。如為後者則一個新的 Lock 物件會被自動建立。在 3.10 版的變更: 移除
loop
參數。使用 Condition 的推薦方式是透過
async with
陳述式:cond = asyncio.Condition() # ... later async with cond: await cond.wait()
這等價於:
cond = asyncio.Condition() # ... later await cond.acquire() try: await cond.wait() finally: cond.release()
- coroutine acquire()¶
獲取底層的鎖。
此方法會持續等待直到底層的鎖為 unlocked,並將其設為 locked 並回傳
True
。
- notify(n=1)¶
喚醒至多 n 個正在等待此條件的任務(預設為 1),如果沒有正在等待的任務則此方法為空操作 (no-op)。
在此方法被呼叫前必須先獲得鎖,並在之後立刻將其釋放。如果呼叫於一個 unlocked 的鎖則
RuntimeError
錯誤會被引發。
- locked()¶
如果已獲取底層的鎖則回傳
True
。
- notify_all()¶
喚醒所有正在等待此條件的任務。
這個方法的行為就像
notify()
,但會喚醒所有正在等待的任務。在此方法被呼叫前必須先獲得鎖,並在之後立刻將其釋放。如果呼叫於一個 unlocked 的鎖則
RuntimeError
錯誤會被引發。
- release()¶
釋放底層的鎖。
當調用於一個未被解開的鎖之上時,會引發一個
RuntimeError
。
- coroutine wait()¶
持續等待直到被通知 (notify)。
當此方法被呼叫時,如果呼叫它的任務還沒有獲取鎖的話,
RuntimeError
會被引發。此方法會釋放底層的鎖,然後持續阻塞直到被
notify()
或notify_all()
的呼叫所喚醒。一但被喚醒,Condition 會重新獲取該鎖且此方法會回傳True
。
- coroutine wait_for(predicate)¶
持續等待直到謂語 (predicate) 成為 true。
謂語必須是一個結果可被直譯為一個 boolean 值的可呼叫物件 (callable)。最終值為回傳值。
Semaphore¶
- class asyncio.Semaphore(value=1)¶
一個旗號 (semaphore) 物件。不支援執行緒安全。
一個旗號物件會管理一個內部計數器,會在每次呼叫
acquire()
時減少一、每次呼叫release()
時增加一。此計數器永遠不會少於零;當acquire()
發現它是零時,它會持續阻塞並等待某任務呼叫release()
。可選的 value 引數給定了內部計數器的初始值(預設為
1
)。如給定的值少於0
則ValueError
會被引發。在 3.10 版的變更: 移除
loop
參數。使用 Semaphore 的推薦方式是透過
async with
陳述式:sem = asyncio.Semaphore(10) # ... later async with sem: # work with shared resource
這等價於:
sem = asyncio.Semaphore(10) # ... later await sem.acquire() try: # work with shared resource finally: sem.release()
- locked()¶
如果旗號無法立即被取得則回傳
True
。
- release()¶
釋放一個旗號,並為其內部的計數器數值增加一。可以把一個正在等待獲取旗號的任務叫醒。
和
BoundedSemaphore
不同,Semaphore
允許release()
的呼叫次數多於acquire()
。
BoundedSemaphore¶
- class asyncio.BoundedSemaphore(value=1)¶
一個有界的旗號物件。不支援執行緒安全。
Bounded Semaphore 是
Semaphore
的另一版本,如果其內部的計數器數值增加至大於初始 value 值的話,ValueError
會在release()
時被引發。在 3.10 版的變更: 移除
loop
參數。
Barrier¶
- class asyncio.Barrier(parties)¶
一個屏障 (barrier) 物件。不支援執行緒安全。
屏障是一个允许阻塞直到 parties 个任务在其上等待的简单同步原语。 任务可以在
wait()
方法上等待并将被阻塞直到有指定数量的任务在wait()
上等待。 在那时所有正在等待的任务将同时撤销阻塞。async with
可以被用作在wait()
上等待的替代。屏障可被重复使用任意次数。
範例:
async def example_barrier(): # barrier with 3 parties b = asyncio.Barrier(3) # create 2 new waiting tasks asyncio.create_task(b.wait()) asyncio.create_task(b.wait()) await asyncio.sleep(0) print(b) # The third .wait() call passes the barrier await b.wait() print(b) print("barrier passed") await asyncio.sleep(0) print(b) asyncio.run(example_barrier())
该示例的结果为:
<asyncio.locks.Barrier object at 0x... [filling, waiters:2/3]> <asyncio.locks.Barrier object at 0x... [draining, waiters:0/3]> barrier passed <asyncio.locks.Barrier object at 0x... [filling, waiters:0/3]>
在 3.11 版新加入.
- coroutine wait()¶
穿过屏障。 当屏障汇集的所有任务都调用了此函数时,它们将被同时撤销阻塞。
当该屏障中等待或阻塞的任务被取消时,此任务将退出屏障而屏障将保持相同状态。 如果屏障的状态为 "正在填充",则等待的任务数量将减 1。
返回值是一个 0 到
parties-1
之间的整数,对于每个任务来说各不相同。 这可被用来选择一个任务以执行某些特别的操作。 例如:... async with barrier as position: if position == 0: # Only one task prints this print('End of *draining phase*')
如果屏障在有任务在等待时已被破坏或重置则此方法可能会引发
BrokenBarrierError
。 如果有任务被取消则它可能会引发CancelledError
。
- coroutine reset()¶
将屏障返回为默认的空白状态。 任何正在其上等待的任务将会接收到
BrokenBarrierError
异常。如果屏障已被破坏则最好的是让其保持原状并创建一个新的屏障。
- coroutine abort()¶
使屏障处于已破坏状态。 这会导致任何现有和未来对
wait()
的调用失败并引发BrokenBarrierError
。 例如可以在需要中止某个任务时使用此方法,以避免任务无限等待。
- parties¶
请求穿过该屏障的任务的数量。
- n_waiting¶
当执行填充时正在屏障中等待的任务的数量。
- broken¶
一个布尔值,值为
True
表明栅栏为破损态。
- exception asyncio.BrokenBarrierError¶
异常类,是
RuntimeError
异常的子类,在Barrier
对象重置时仍有线程阻塞时和对象进入破损态时被引发。
在 3.9 版的變更: 透過 await lock
或 yield from lock
和/或 with
陳述式 (with await lock
, with (yield from lock)
) 來獲取鎖的方式已被移除。請改用 async with lock
。