佇列 (Queues)

原始碼:Lib/asyncio/queues.py


asyncio 佇列被設計成與 queue 模組類似。儘管 asyncio 佇列不支援執行緒安全 (thread-safe),但他們是被設計來專用於 async/await 程式。

注意 asyncio 的佇列沒有 timeout 參數;請使用 asyncio.wait_for() 函式來為佇列新增具有超時 (timeout) 設定的操作。

另請參閱下方 Examples

Queue

class asyncio.Queue(maxsize=0)

先進先出 (FIFO) 佇列。

如果 maxsize 小於或等於零,則佇列尺寸是無限制的。如果是大於 0 的整數,則當佇列達到 maxsize 時,await put() 將會阻塞 (block),直到某個元素被 get() 取出。

不像標準函式庫中執行緒類型的 queue,佇列的尺寸一直是已知的,可以透過呼叫 qsize() 方法回傳。

在 3.10 版的變更: 移除 loop 參數。

這個類別是不支援執行緒安全的

maxsize

佇列中可存放的元素數量。

empty()

如果佇列為空則回傳 True,否則回傳 False

full()

如果有 maxsize 個條目在佇列中,則回傳 True

如果佇列用 maxsize=0 (預設)初始化,則 full() 永遠不會回傳 True

coroutine get()

從佇列中刪除並回傳一個元素。如果佇列為空,則持續等待直到佇列中有元素。

get_nowait()

如果佇列內有值則立即回傳佇列中的元素,否則引發 QueueEmpty

coroutine join()

持續阻塞直到佇列中所有的元素都被接收和處理完畢。

當條目新增到佇列的時候,未完成任務的計數就會增加。每當一個消耗者 (consumer) 協程呼叫 task_done(),表示這個條目已經被取回且被它包含的所有工作都已完成,未完成任務計數就會減少。當未完成計數降到零的時候,join() 阻塞會被解除 (unblock)。

coroutine put(item)

將一個元素放進佇列。如果佇列滿了,在新增元素之前,會持續等待直到有空閒插槽 (free slot) 能被使用。

put_nowait(item)

不阻塞地將一個元素放入佇列。

如果沒有立即可用的空閒插槽,引發 QueueFull

qsize()

回傳佇列中的元素數量。

task_done()

表示前面一個排隊的任務已經完成。

由佇列消耗者使用。對於每個用於獲取一個任務的 get(),接續的 task_done() 呼叫會告訴佇列這個任務的處理已經完成。

如果 join() 當前正在阻塞,在所有項目都被處理後會解除阻塞(意味著每個以 put() 放進佇列的條目都會收到一個 task_done())。

如果被呼叫的次數多於放入佇列中的項目數量,將引發 ValueError

Priority Queue(優先佇列)

class asyncio.PriorityQueue

Queue 的變形;按優先順序取出條目 (最小的先取出)。

條目通常是 (priority_number, data) 形式的 tuple(元組)。

LIFO Queue

class asyncio.LifoQueue

Queue 的變形,先取出最近新增的條目(後進先出)。

例外

exception asyncio.QueueEmpty

當佇列為空的時候,呼叫 get_nowait() 方法會引發這個例外。

exception asyncio.QueueFull

當佇列中條目數量已經達到它的 maxsize 時,呼叫 put_nowait() 方法會引發這個例外。

範例

佇列能被用於多個並行任務的工作分配:

import asyncio
import random
import time


async def worker(name, queue):
    while True:
        # Get a "work item" out of the queue.
        sleep_for = await queue.get()

        # Sleep for the "sleep_for" seconds.
        await asyncio.sleep(sleep_for)

        # Notify the queue that the "work item" has been processed.
        queue.task_done()

        print(f'{name} has slept for {sleep_for:.2f} seconds')


async def main():
    # Create a queue that we will use to store our "workload".
    queue = asyncio.Queue()

    # Generate random timings and put them into the queue.
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)

    # Create three worker tasks to process the queue concurrently.
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)

    # Wait until the queue is fully processed.
    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at

    # Cancel our worker tasks.
    for task in tasks:
        task.cancel()
    # Wait until all worker tasks are cancelled.
    await asyncio.gather(*tasks, return_exceptions=True)

    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')


asyncio.run(main())