队列集¶
asyncio 队列被设计成与 queue 模块类似。尽管 asyncio队列不是线程安全的,但是他们是被设计专用于 async/await 代码。
注意asyncio 的队列没有 timeout 形参;请使用 asyncio.wait_for() 函数为队列添加超时操作。
参见下面的 Examples 部分。
Queue¶
- class asyncio.Queue(maxsize=0)¶
- 先进,先出(FIFO)队列 - 如果 maxsize 小于等于零,则队列尺寸是无限的。如果是大于 - 0的整数,则当队列达到 maxsize 时,- await put()将阻塞至某个元素被- get()取出。- 不像标准库中的并发型 - queue,队列的尺寸一直是已知的,可以通过调用- qsize()方法返回。- 在 3.10 版本发生变更: 移除了 loop 形参。 - 这个类不是线程安全的(not thread safe)。 - maxsize¶
- 队列中可存放的元素数量。 
 - empty()¶
- 如果队列为空返回 - True,否则返回- False。
 - async get()¶
- 从队列中删除并返回一个元素。如果队列为空,则等待,直到队列中有元素。 - 如果队列已被关闭并且为空,或者如果队列已被立即关闭则会引发 - QueueShutDown。
 - get_nowait()¶
- 立即返回一个队列中的元素,如果队列内有值,否则引发异常 - QueueEmpty。
 - async join()¶
- 阻塞至队列中所有的元素都被接收和处理完毕。 - 当条目添加到队列的时候,未完成任务的计数就会增加。每当消费协程调用 - task_done()表示这个条目已经被回收,该条目所有工作已经完成,未完成计数就会减少。当未完成计数降到零的时候,- join()阻塞被解除。
 - async put(item)¶
- 添加一个元素进队列。如果队列满了,在添加元素之前,会一直等待空闲插槽可用。 - 如果队列已被关闭则会引发 - QueueShutDown。
 - qsize()¶
- 返回队列用的元素数量。 
 - shutdown(immediate=False)¶
- 将一个 - Queue实例置为关闭模式。- 该队列将不可再增长。 未来对 - put()的调用将引发- QueueShutDown。 当前被阻塞的- put()调用方将被取消阻塞并会在之前被阻塞的线程中引发- QueueShutDown。- 如果 immediate 为假值(默认),则队列可通过 - get()调用正常缩减以提取已被加载的任务。- 而如果 - task_done()针对每个剩余的任务被调用,则挂起的- join()将被正常取消阻塞。- 一旦队列为空,未来对 - get()的调用将会引发- QueueShutDown。- 如果 immediate 为真值,则队列会被立即终结。 列队将缩减至完全为空。 所有的 - join()调用方将被取消阻塞而不考虑未完成的任务数量。 被阻塞的- get()调用方将被取消阻塞并将引发- QueueShutDown因为队列已为空。- 在 immediate 设为真值的情况下使用 - join()需要小心谨慎。 这会取消对已加入任务的阻塞即使任务尚未被执行,从而破坏通常的加入队列任务的不变性。- Added in version 3.13. 
 - task_done()¶
- 表明之前加入队列的工作条目已经完成。 - 由队列的消费者使用。 对于每个被用于获取工作条目的 - get(),将有一个对- task_done()的后续调用来告诉队列工作条目的操作已完成。- 如果 - join()当前正在阻塞,在所有条目都被处理后,将解除阻塞(意味着每个- put()进队列的条目的- task_done()都被收到)。- 如果被调用的次数多于放入队列中的项目数量,将引发 - ValueError。
 
优先级队列¶
后进先出队列¶
异常¶
- exception asyncio.QueueEmpty¶
- 当队列为空的时候,调用 - get_nowait()方法而引发这个异常。
- exception asyncio.QueueFull¶
- 当队列中条目数量已经达到它的 maxsize 的时候,调用 - put_nowait()方法而引发的异常。
例子¶
队列能被用于多个的并发任务的工作量分配:
import asyncio
import random
import time
async def worker(name, queue):
    while True:
        # 从队列获取一个“工作项”。
        sleep_for = await queue.get()
        # 休眠 "sleep_for" 秒。
        await asyncio.sleep(sleep_for)
        # 通知队列“工作项”已被处理。
        queue.task_done()
        print(f'{name} has slept for {sleep_for:.2f} seconds')
async def main():
    # 创建一个用于存储我们的“工作项”的队列。
    queue = asyncio.Queue()
    # 生成随机时段并将它们放入队列。
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)
    # 创建三个工作任务来并发地处理队列。
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)
    # 等待直到队列处理完毕。
    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at
    # 取消我们的工作任务。
    for task in tasks:
        task.cancel()
    # 等待直到所有工作任务都被取消。
    await asyncio.gather(*tasks, return_exceptions=True)
    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())