キュー¶
ソースコード: Lib/asyncio/queues.py
asyncio キューは queue モジュールのクラス群と同じ形になるように設計されています。 asyncio キューはスレッドセーフではありませんが、それらは async/await コードから使われるために特別に設計されています。
asyncio キューのメソッドは timeout パラメータを持たないことに注意してください; タイムアウトを伴うキューを使った処理を行うには asyncio.wait_for() 関数を使ってください。
下記の 使用例 節も参照してください。
Queue¶
- class asyncio.Queue(maxsize=0)¶
- 先入れ先出し (FIFO) キューです。 - maxsize がゼロ以下の場合、キューは無限長になります。 - 0より大きい整数の場合、キューが maxsize に達すると- await put()は- get()によってキューの要素が除去されるまでブロックします。- 標準ライブラリにおけるスレッドベースの - queueモジュールと異なり、キューのサイズは常に既知であり、- qsize()メソッドを呼び出すことによって取得することができます。- バージョン 3.10 で変更: loop パラメータが削除されました。 - このクラスは スレッド安全ではありません。 - maxsize¶
- キューに追加できるアイテム数です。 
 - empty()¶
- キューが空ならば - Trueを、そうでなければ- Falseを返します。
 - full()¶
- キューに要素が - maxsize個あれば- Trueを返します。- If the queue was initialized with - maxsize=0(the default), then- full()never returns- True.
 - async get()¶
- キューから要素を削除して返します。キューが空の場合項目が利用可能になるまで待機します。 - Raises - QueueShutDownif the queue has been shut down and is empty, or if the queue has been shut down immediately.
 - get_nowait()¶
- 直ちに利用できるアイテムがあるときはそれを、そうでなければ - QueueEmptyを返します。
 - async join()¶
- キューにある全ての要素が取得され、処理されるまでブロックします。 - 未完了のタスクのカウント値は、キューにアイテムが追加されるときは常に加算され、キューの要素を消費するコルーチンが要素を取り出し、処理を完了したことを通知するために - task_done()を呼び出すと減算されます。未完了のタスクのカウント値がゼロになると、- join()のブロックは解除されます。
 - async put(item)¶
- 要素をキューに入力します。キューが満杯の場合、要素を追加する前に空きスロットが利用できるようになるまで待機します。 - Raises - QueueShutDownif the queue has been shut down.
 - qsize()¶
- キュー内の要素数を返します。 
 - shutdown(immediate=False)¶
- Put a - Queueinstance into a shutdown mode.- The queue can no longer grow. Future calls to - put()raise- QueueShutDown. Currently blocked callers of- put()will be unblocked and will raise- QueueShutDownin the formerly blocked thread.- If immediate is false (the default), the queue can be wound down normally with - get()calls to extract tasks that have already been loaded.- And if - task_done()is called for each remaining task, a pending- join()will be unblocked normally.- Once the queue is empty, future calls to - get()will raise- QueueShutDown.- If immediate is true, the queue is terminated immediately. The queue is drained to be completely empty and the count of unfinished tasks is reduced by the number of tasks drained. If unfinished tasks is zero, callers of - join()are unblocked. Also, blocked callers of- get()are unblocked and will raise- QueueShutDownbecause the queue is empty.- Use caution when using - join()with immediate set to true. This unblocks the join even when no work has been done on the tasks, violating the usual invariant for joining a queue.- Added in version 3.13. 
 - task_done()¶
- Indicate that a formerly enqueued work item is complete. - Used by queue consumers. For each - get()used to fetch a work item, a subsequent call to- task_done()tells the queue that the processing on the work item is complete.- join()が現在ブロック中だった場合、全アイテムが処理されたとき (- put()でキューに追加された全アイテムの- task_done()の呼び出しを受信したとき) に再開します。- キューに追加されているアイテム数以上の呼び出しが行われたときに - ValueErrorを送出します。
 
優先度付きのキュー¶
LIFO キュー¶
例外¶
- exception asyncio.QueueEmpty¶
- この例外は - get_nowait()メソッドが空のキューに対して呼ばれたときに送出されます。
- exception asyncio.QueueFull¶
- サイズが maxsize に達したキューに対して - put_nowait()メソッドが 呼ばれたときに送出される例外です。
使用例¶
キューを使って、並行処理を行う複数のタスクにワークロードを分散させることができます:
import asyncio
import random
import time
async def worker(name, queue):
    while True:
        # Get a "work item" out of the queue.
        sleep_for = await queue.get()
        # Sleep for the "sleep_for" seconds.
        await asyncio.sleep(sleep_for)
        # Notify the queue that the "work item" has been processed.
        queue.task_done()
        print(f'{name} has slept for {sleep_for:.2f} seconds')
async def main():
    # Create a queue that we will use to store our "workload".
    queue = asyncio.Queue()
    # Generate random timings and put them into the queue.
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)
    # Create three worker tasks to process the queue concurrently.
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)
    # Wait until the queue is fully processed.
    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at
    # Cancel our worker tasks.
    for task in tasks:
        task.cancel()
    # Wait until all worker tasks are cancelled.
    await asyncio.gather(*tasks, return_exceptions=True)
    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())