キュー

asyncio キューは queue モジュールのクラス群と同じ形になるように設計されています。 asyncio キューはスレッドセーフではありませんが、それらは async/await コードから使われるために特別に設計されています。

asyncio キューのメソッドは timeout パラメータを持たないことに注意してください; タイムアウトを伴うキューを使った処理を行うには asyncio.wait_for() 関数を使ってください。

下記の 使用例 節も参照してください。

Queue

class asyncio.Queue(maxsize=0, *, loop=None)

先入れ先出し (FIFO) キューです。

maxsize がゼロ以下の場合、キューは無限長になります。 0 より大きい整数の場合、キューが maxsize に達すると await put()get() によってキューの要素が除去されるまでブロックします。

標準ライブラリにおけるスレッドベースの queue モジュールと異なり、キューのサイズは常に既知であり、 qsize() メソッドを呼び出すことによって取得することができます。

このクラスは スレッド安全ではありません

maxsize

キューに追加できるアイテム数です。

empty()

キューが空ならば True を、そうでなければ False を返します。

full()

キューに要素が maxsize 個あれば True を返します。

キューが maxsize=0 (デフォルト値) で初期化された場合、 full() メソッドが True を返すことはありません。

coroutine get()

キューから要素を削除して返します。キューが空の場合項目が利用可能になるまで待機します。

get_nowait()

直ちに利用できるアイテムがあるときはそれを、そうでなければ QueueEmpty を返します。

coroutine join()

キューにある全ての要素が取得され、処理されるまでブロックします。

未完了のタスクのカウント値は、キューにアイテムが追加されるときは常に加算され、キューの要素を消費するコルーチンが要素を取り出し、処理を完了したことを通知するために task_done() を呼び出すと減算されます。未完了のタスクのカウント値がゼロになると、 join() のブロックは解除されます。

coroutine put(item)

要素をキューに入力します。キューが満杯の場合、要素を追加する前に空きスロットが利用できるようになるまで待機します。

put_nowait(item)

ブロックせずにアイテムをキューに追加します。

直ちに利用できるスロットがない場合、QueueFull を送出します。

qsize()

キュー内の要素数を返します。

task_done()

キューに入っていたタスクが完了したことを示します。

キューコンシューマーによって使用されます。タスクの取得に get() を使用し、その後の task_done() の呼び出しでタスクの処理が完了したことをキューに通知します。

join() が現在ブロック中だった場合、全アイテムが処理されたとき (put() でキューに追加された全アイテムの task_done() の呼び出しを受信したとき) に再開します。

キューに追加されているアイテム数以上の呼び出しが行われたときに ValueError を送出します。

優先度付きのキュー

class asyncio.PriorityQueue

Queue の変種です; 優先順位にしたがって要素を取り出します (最低順位が最初に取り出されます)。

項目は典型的には (priority_number, data) 形式のタプルです。

LIFO キュー

class asyncio.LifoQueue

Queue の変種で、最後に追加された項目を最初に取り出します (後入れ先出し、またはスタック)。

例外

exception asyncio.QueueEmpty

この例外は get_nowait() メソッドが空のキューに対して呼ばれたときに送出されます。

exception asyncio.QueueFull

サイズが maxsize に達したキューに対して put_nowait() メソッドが 呼ばれたときに送出される例外です。

使用例

キューを使って、並行処理を行う複数のタスクにワークロードを分散させることができます:

import asyncio
import random
import time


async def worker(name, queue):
    while True:
        # Get a "work item" out of the queue.
        sleep_for = await queue.get()

        # Sleep for the "sleep_for" seconds.
        await asyncio.sleep(sleep_for)

        # Notify the queue that the "work item" has been processed.
        queue.task_done()

        print(f'{name} has slept for {sleep_for:.2f} seconds')


async def main():
    # Create a queue that we will use to store our "workload".
    queue = asyncio.Queue()

    # Generate random timings and put them into the queue.
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)

    # Create three worker tasks to process the queue concurrently.
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)

    # Wait until the queue is fully processed.
    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at

    # Cancel our worker tasks.
    for task in tasks:
        task.cancel()
    # Wait until all worker tasks are cancelled.
    await asyncio.gather(*tasks, return_exceptions=True)

    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')


asyncio.run(main())