佇列 (Queues)¶
asyncio 佇列被設計成與 queue
模組類似。儘管 asyncio 佇列不支援執行緒安全 (thread-safe),但他們是被設計來專用於 async/await 程式。
注意 asyncio 的佇列沒有 timeout 參數;請使用 asyncio.wait_for()
函式來為佇列新增具有超時 (timeout) 設定的操作。
另請參閱下方 Examples。
Queue¶
- class asyncio.Queue(maxsize=0)¶
先進先出 (FIFO) 佇列。
如果 maxsize 小於或等於零,則佇列尺寸是無限制的。如果是大於
0
的整數,則當佇列達到 maxsize 時,await put()
將會阻塞 (block),直到某個元素被get()
取出。不像標準函式庫中執行緒類型的
queue
,佇列的尺寸一直是已知的,可以透過呼叫qsize()
方法回傳。在 3.10 版的變更: 移除 loop 參數。
這個類別是不支援執行緒安全的。
- maxsize¶
佇列中可存放的元素數量。
- empty()¶
如果佇列為空則回傳
True
,否則回傳False
。
- coroutine get()¶
從佇列中刪除並回傳一個元素。如果佇列為空,則持續等待直到佇列中有元素。
- get_nowait()¶
如果佇列內有值則立即回傳佇列中的元素,否則引發
QueueEmpty
。
- coroutine join()¶
持續阻塞直到佇列中所有的元素都被接收和處理完畢。
當條目新增到佇列的時候,未完成任務的計數就會增加。每當一個消耗者 (consumer) 協程呼叫
task_done()
,表示這個條目已經被取回且被它包含的所有工作都已完成,未完成任務計數就會減少。當未完成計數降到零的時候,join()
阻塞會被解除 (unblock)。
- coroutine put(item)¶
將一個元素放進佇列。如果佇列滿了,在新增元素之前,會持續等待直到有空閒插槽 (free slot) 能被使用。
- qsize()¶
回傳佇列中的元素數量。
- task_done()¶
表示前面一個排隊的任務已經完成。
由佇列消耗者使用。對於每個用於獲取一個任務的
get()
,接續的task_done()
呼叫會告訴佇列這個任務的處理已經完成。如果
join()
當前正在阻塞,在所有項目都被處理後會解除阻塞(意味著每個以put()
放進佇列的條目都會收到一個task_done()
)。如果被呼叫的次數多於放入佇列中的項目數量,將引發
ValueError
。
Priority Queue(優先佇列)¶
LIFO Queue¶
例外¶
- exception asyncio.QueueEmpty¶
當佇列為空的時候,呼叫
get_nowait()
方法會引發這個例外。
- exception asyncio.QueueFull¶
當佇列中條目數量已經達到它的 maxsize 時,呼叫
put_nowait()
方法會引發這個例外。
範例¶
佇列能被用於多個並行任務的工作分配:
import asyncio
import random
import time
async def worker(name, queue):
while True:
# Get a "work item" out of the queue.
sleep_for = await queue.get()
# Sleep for the "sleep_for" seconds.
await asyncio.sleep(sleep_for)
# Notify the queue that the "work item" has been processed.
queue.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
async def main():
# Create a queue that we will use to store our "workload".
queue = asyncio.Queue()
# Generate random timings and put them into the queue.
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
# Create three worker tasks to process the queue concurrently.
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
# Wait until the queue is fully processed.
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
# Cancel our worker tasks.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())