キュー

asyncio queues are designed to be similar to classes of the queue module. Although asyncio queues are not thread-safe, they are designed to be used specifically in async/await code.

Note that methods of asyncio queues don't have a timeout parameter; use asyncio.wait_for() function to do queue operations with a timeout.

See also the Examples section below.

Queue

class asyncio.Queue(maxsize=0, *, loop=None)

A first in, first out (FIFO) queue.

If maxsize is less than or equal to zero, the queue size is infinite. If it is an integer greater than 0, then await put() blocks when the queue reaches maxsize until an item is removed by get().

Unlike the standard library threading queue, the size of the queue is always known and can be returned by calling the qsize() method.

このクラスは スレッド安全ではありません

maxsize

キューに追加できるアイテム数です。

empty()

キューが空ならば True を、そうでなければ False を返します。

full()

キューに要素が maxsize 個あれば True を返します。

If the queue was initialized with maxsize=0 (the default), then full() never returns True.

coroutine get()

キューから要素を削除して返します。キューが空の場合項目が利用可能になるまで待機します。

get_nowait()

直ちに利用できるアイテムがあるときはそれを、そうでなければ QueueEmpty を返します。

coroutine join()

Block until all items in the queue have been received and processed.

未完了のタスクのカウント値はキューにアイテムが追加されるときは常に加算され、コンシューマースレッドが task_done() を呼び出してアイテムの回収とその全処理の完了が示されるときは常に減算されます。未完了のタスクのカウント値がゼロになった場合、join() のブロックが解除されます。

coroutine put(item)

Put an item into the queue. If the queue is full, wait until a free slot is available before adding the item.

put_nowait(item)

ブロックせずにアイテムをキューに追加します。

直ちに利用できるスロットがない場合、QueueFull を送出します。

qsize()

Return the number of items in the queue.

task_done()

キューに入っていたタスクが完了したことを示します。

キューコンシューマーによって使用されます。タスクの取得に get() を使用し、その後の task_done() の呼び出しでタスクの処理が完了したことをキューに通知します。

join() が現在ブロック中だった場合、全アイテムが処理されたとき (put() でキューに追加された全アイテムの task_done() の呼び出しを受信したとき) に再開します。

キューに追加されているアイテム数以上の呼び出しが行われたときに ValueError を送出します。

Priority Queue

class asyncio.PriorityQueue

A variant of Queue; retrieves entries in priority order (lowest first).

Entries are typically tuples of the form (priority_number, data).

LIFO Queue

class asyncio.LifoQueue

A variant of Queue that retrieves most recently added entries first (last in, first out).

例外

exception asyncio.QueueEmpty

This exception is raised when the get_nowait() method is called on an empty queue.

exception asyncio.QueueFull

Exception raised when the put_nowait() method is called on a queue that has reached its maxsize.

使用例

Queues can be used to distribute workload between several concurrent tasks:

import asyncio
import random
import time


async def worker(name, queue):
    while True:
        # Get a "work item" out of the queue.
        sleep_for = await queue.get()

        # Sleep for the "sleep_for" seconds.
        await asyncio.sleep(sleep_for)

        # Notify the queue that the "work item" has been processed.
        queue.task_done()

        print(f'{name} has slept for {sleep_for:.2f} seconds')


async def main():
    # Create a queue that we will use to store our "workload".
    queue = asyncio.Queue()

    # Generate random timings and put them into the queue.
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)

    # Create three worker tasks to process the queue concurrently.
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)

    # Wait until the queue is fully processed.
    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at

    # Cancel our worker tasks.
    for task in tasks:
        task.cancel()
    # Wait until all worker tasks are cancelled.
    await asyncio.gather(*tasks, return_exceptions=True)

    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')


asyncio.run(main())