佇列 (Queues)

原始碼:Lib/asyncio/queues.py


asyncio 佇列被設計成與 queue 模組類似。儘管 asyncio 佇列不支援執行緒安全 (thread-safe),但他們是被設計來專用於 async/await 程式。

注意 asyncio 的佇列沒有 timeout 參數;請使用 asyncio.wait_for() 函式來為佇列新增具有超時 (timeout) 設定的操作。

另請參閱下方 Examples

Queue

class asyncio.Queue(maxsize=0)

先進先出 (FIFO) 佇列。

如果 maxsize 小於或等於零,則佇列尺寸是無限制的。如果是大於 0 的整數,則當佇列達到 maxsize 時,await put() 將會阻塞 (block),直到某個元素被 get() 取出。

不像標準函式庫中執行緒類型的 queue,佇列的尺寸一直是已知的,可以透過呼叫 qsize() 方法回傳。

在 3.10 版的變更: 移除 loop 參數。

這個類別是不支援執行緒安全的

maxsize

佇列中可存放的元素數量。

empty()

如果佇列為空則回傳 True,否則回傳 False

full()

如果有 maxsize 個條目在佇列中,則回傳 True

如果佇列用 maxsize=0 (預設)初始化,則 full() 永遠不會回傳 True

coroutine get()

從佇列中刪除並回傳一個元素。如果佇列為空,則持續等待直到佇列中有元素。

Raises QueueShutDown if the queue has been shut down and is empty, or if the queue has been shut down immediately.

get_nowait()

如果佇列內有值則立即回傳佇列中的元素,否則引發 QueueEmpty

coroutine join()

持續阻塞直到佇列中所有的元素都被接收和處理完畢。

當條目新增到佇列的時候,未完成任務的計數就會增加。每當一個消耗者 (consumer) 協程呼叫 task_done(),表示這個條目已經被取回且被它包含的所有工作都已完成,未完成任務計數就會減少。當未完成計數降到零的時候,join() 阻塞會被解除 (unblock)。

coroutine put(item)

將一個元素放進佇列。如果佇列滿了,在新增元素之前,會持續等待直到有空閒插槽 (free slot) 能被使用。

Raises QueueShutDown if the queue has been shut down.

put_nowait(item)

不阻塞地將一個元素放入佇列。

如果沒有立即可用的空閒插槽,引發 QueueFull

qsize()

回傳佇列中的元素數量。

shutdown(immediate=False)

Shut down the queue, making get() and put() raise QueueShutDown.

By default, get() on a shut down queue will only raise once the queue is empty. Set immediate to true to make get() raise immediately instead.

All blocked callers of put() and get() will be unblocked. If immediate is true, a task will be marked as done for each remaining item in the queue, which may unblock callers of join().

在 3.13 版被加入.

task_done()

表示前面一個排隊的任務已經完成。

由佇列消耗者使用。對於每個用於獲取一個任務的 get(),接續的 task_done() 呼叫會告訴佇列這個任務的處理已經完成。

如果 join() 當前正在阻塞,在所有項目都被處理後會解除阻塞(意味著每個以 put() 放進佇列的條目都會收到一個 task_done())。

shutdown(immediate=True) calls task_done() for each remaining item in the queue.

如果被呼叫的次數多於放入佇列中的項目數量,將引發 ValueError

Priority Queue(優先佇列)

class asyncio.PriorityQueue

Queue 的變形;按優先順序取出條目 (最小的先取出)。

條目通常是 (priority_number, data) 形式的 tuple(元組)。

LIFO Queue

class asyncio.LifoQueue

Queue 的變形,先取出最近新增的條目(後進先出)。

例外

exception asyncio.QueueEmpty

當佇列為空的時候,呼叫 get_nowait() 方法會引發這個例外。

exception asyncio.QueueFull

當佇列中條目數量已經達到它的 maxsize 時,呼叫 put_nowait() 方法會引發這個例外。

exception asyncio.QueueShutDown

Exception raised when put() or get() is called on a queue which has been shut down.

在 3.13 版被加入.

範例

佇列能被用於多個並行任務的工作分配:

import asyncio
import random
import time


async def worker(name, queue):
    while True:
        # Get a "work item" out of the queue.
        sleep_for = await queue.get()

        # Sleep for the "sleep_for" seconds.
        await asyncio.sleep(sleep_for)

        # Notify the queue that the "work item" has been processed.
        queue.task_done()

        print(f'{name} has slept for {sleep_for:.2f} seconds')


async def main():
    # Create a queue that we will use to store our "workload".
    queue = asyncio.Queue()

    # Generate random timings and put them into the queue.
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)

    # Create three worker tasks to process the queue concurrently.
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)

    # Wait until the queue is fully processed.
    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at

    # Cancel our worker tasks.
    for task in tasks:
        task.cancel()
    # Wait until all worker tasks are cancelled.
    await asyncio.gather(*tasks, return_exceptions=True)

    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')


asyncio.run(main())