キュー

ソースコード: Lib/asyncio/queues.py


asyncio キューは queue モジュールのクラス群と同じ形になるように設計されています。 asyncio キューはスレッドセーフではありませんが、それらは async/await コードから使われるために特別に設計されています。

asyncio キューのメソッドは timeout パラメータを持たないことに注意してください; タイムアウトを伴うキューを使った処理を行うには asyncio.wait_for() 関数を使ってください。

下記の 使用例 節も参照してください。

Queue

class asyncio.Queue(maxsize=0)

先入れ先出し (FIFO) キューです。

maxsize がゼロ以下の場合、キューは無限長になります。 0 より大きい整数の場合、キューが maxsize に達すると await put()get() によってキューの要素が除去されるまでブロックします。

標準ライブラリにおけるスレッドベースの queue モジュールと異なり、キューのサイズは常に既知であり、 qsize() メソッドを呼び出すことによって取得することができます。

バージョン 3.10 で変更: loop パラメータが削除されました。

このクラスは スレッド安全ではありません

maxsize

キューに追加できるアイテム数です。

empty()

キューが空ならば True を、そうでなければ False を返します。

full()

キューに要素が maxsize 個あれば True を返します。

If the queue was initialized with maxsize=0 (the default), then full() never returns True.

coroutine get()

キューから要素を削除して返します。キューが空の場合項目が利用可能になるまで待機します。

Raises QueueShutDown if the queue has been shut down and is empty, or if the queue has been shut down immediately.

get_nowait()

直ちに利用できるアイテムがあるときはそれを、そうでなければ QueueEmpty を返します。

coroutine join()

キューにある全ての要素が取得され、処理されるまでブロックします。

未完了のタスクのカウント値は、キューにアイテムが追加されるときは常に加算され、キューの要素を消費するコルーチンが要素を取り出し、処理を完了したことを通知するために task_done() を呼び出すと減算されます。未完了のタスクのカウント値がゼロになると、 join() のブロックは解除されます。

coroutine put(item)

要素をキューに入力します。キューが満杯の場合、要素を追加する前に空きスロットが利用できるようになるまで待機します。

Raises QueueShutDown if the queue has been shut down.

put_nowait(item)

ブロックせずにアイテムをキューに追加します。

直ちに利用できるスロットがない場合、QueueFull を送出します。

qsize()

キュー内の要素数を返します。

shutdown(immediate=False)

Shut down the queue, making get() and put() raise QueueShutDown.

By default, get() on a shut down queue will only raise once the queue is empty. Set immediate to true to make get() raise immediately instead.

All blocked callers of put() and get() will be unblocked. If immediate is true, a task will be marked as done for each remaining item in the queue, which may unblock callers of join().

Added in version 3.13.

task_done()

Indicate that a formerly enqueued work item is complete.

Used by queue consumers. For each get() used to fetch a work item, a subsequent call to task_done() tells the queue that the processing on the work item is complete.

join() が現在ブロック中だった場合、全アイテムが処理されたとき (put() でキューに追加された全アイテムの task_done() の呼び出しを受信したとき) に再開します。

shutdown(immediate=True) calls task_done() for each remaining item in the queue.

キューに追加されているアイテム数以上の呼び出しが行われたときに ValueError を送出します。

優先度付きのキュー

class asyncio.PriorityQueue

Queue の変種です; 優先順位にしたがって要素を取り出します (最低順位が最初に取り出されます)。

項目は典型的には (priority_number, data) 形式のタプルです。

LIFO キュー

class asyncio.LifoQueue

Queue の変種で、最後に追加された項目を最初に取り出します (後入れ先出し、またはスタック)。

例外

exception asyncio.QueueEmpty

この例外は get_nowait() メソッドが空のキューに対して呼ばれたときに送出されます。

exception asyncio.QueueFull

サイズが maxsize に達したキューに対して put_nowait() メソッドが 呼ばれたときに送出される例外です。

exception asyncio.QueueShutDown

Exception raised when put() or get() is called on a queue which has been shut down.

Added in version 3.13.

使用例

キューを使って、並行処理を行う複数のタスクにワークロードを分散させることができます:

import asyncio
import random
import time


async def worker(name, queue):
    while True:
        # Get a "work item" out of the queue.
        sleep_for = await queue.get()

        # Sleep for the "sleep_for" seconds.
        await asyncio.sleep(sleep_for)

        # Notify the queue that the "work item" has been processed.
        queue.task_done()

        print(f'{name} has slept for {sleep_for:.2f} seconds')


async def main():
    # Create a queue that we will use to store our "workload".
    queue = asyncio.Queue()

    # Generate random timings and put them into the queue.
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)

    # Create three worker tasks to process the queue concurrently.
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)

    # Wait until the queue is fully processed.
    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at

    # Cancel our worker tasks.
    for task in tasks:
        task.cancel()
    # Wait until all worker tasks are cancelled.
    await asyncio.gather(*tasks, return_exceptions=True)

    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')


asyncio.run(main())