キュー¶
ソースコード: Lib/asyncio/queues.py
asyncio キューは queue
モジュールのクラス群と同じ形になるように設計されています。 asyncio キューはスレッドセーフではありませんが、それらは async/await コードから使われるために特別に設計されています。
asyncio キューのメソッドは timeout パラメータを持たないことに注意してください; タイムアウトを伴うキューを使った処理を行うには asyncio.wait_for()
関数を使ってください。
下記の 使用例 節も参照してください。
Queue¶
-
class
asyncio.
Queue
(maxsize=0)¶ 先入れ先出し (FIFO) キューです。
maxsize がゼロ以下の場合、キューは無限長になります。
0
より大きい整数の場合、キューが maxsize に達するとawait put()
はget()
によってキューの要素が除去されるまでブロックします。標準ライブラリにおけるスレッドベースの
queue
モジュールと異なり、キューのサイズは常に既知であり、qsize()
メソッドを呼び出すことによって取得することができます。バージョン 3.10 で変更: loop パラメータが削除されました。
このクラスは スレッド安全ではありません。
-
maxsize
¶ キューに追加できるアイテム数です。
-
empty
()¶ キューが空ならば
True
を、そうでなければFalse
を返します。
-
full
()¶ キューに要素が
maxsize
個あればTrue
を返します。キューが
maxsize=0
(デフォルト値) で初期化された場合、full()
メソッドがTrue
を返すことはありません。
-
coroutine
get
()¶ キューから要素を削除して返します。キューが空の場合項目が利用可能になるまで待機します。
-
get_nowait
()¶ 直ちに利用できるアイテムがあるときはそれを、そうでなければ
QueueEmpty
を返します。
-
coroutine
join
()¶ キューにある全ての要素が取得され、処理されるまでブロックします。
未完了のタスクのカウント値は、キューにアイテムが追加されるときは常に加算され、キューの要素を消費するコルーチンが要素を取り出し、処理を完了したことを通知するために
task_done()
を呼び出すと減算されます。未完了のタスクのカウント値がゼロになると、join()
のブロックは解除されます。
-
coroutine
put
(item)¶ 要素をキューに入力します。キューが満杯の場合、要素を追加する前に空きスロットが利用できるようになるまで待機します。
-
qsize
()¶ キュー内の要素数を返します。
-
task_done
()¶ キューに入っていたタスクが完了したことを示します。
キューコンシューマーによって使用されます。タスクの取得に
get()
を使用し、その後のtask_done()
の呼び出しでタスクの処理が完了したことをキューに通知します。join()
が現在ブロック中だった場合、全アイテムが処理されたとき (put()
でキューに追加された全アイテムのtask_done()
の呼び出しを受信したとき) に再開します。キューに追加されているアイテム数以上の呼び出しが行われたときに
ValueError
を送出します。
-
優先度付きのキュー¶
LIFO キュー¶
例外¶
-
exception
asyncio.
QueueEmpty
¶ この例外は
get_nowait()
メソッドが空のキューに対して呼ばれたときに送出されます。
-
exception
asyncio.
QueueFull
¶ サイズが maxsize に達したキューに対して
put_nowait()
メソッドが 呼ばれたときに送出される例外です。
使用例¶
キューを使って、並行処理を行う複数のタスクにワークロードを分散させることができます:
import asyncio
import random
import time
async def worker(name, queue):
while True:
# Get a "work item" out of the queue.
sleep_for = await queue.get()
# Sleep for the "sleep_for" seconds.
await asyncio.sleep(sleep_for)
# Notify the queue that the "work item" has been processed.
queue.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
async def main():
# Create a queue that we will use to store our "workload".
queue = asyncio.Queue()
# Generate random timings and put them into the queue.
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
# Create three worker tasks to process the queue concurrently.
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
# Wait until the queue is fully processed.
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
# Cancel our worker tasks.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())