큐¶
asyncio 큐는 queue
모듈의 클래스와 유사하도록 설계되었습니다. asyncio 큐는 스레드 안전하지 않지만, async/await 코드에서 사용되도록 설계되었습니다.
asyncio 큐의 메서드에는 timeout 매개 변수가 없습니다; 시간제한이 있는 큐 연산을 하려면 asyncio.wait_for()
함수를 사용하십시오.
아래의 예제 절도 참조하십시오.
Queue¶
-
class
asyncio.
Queue
(maxsize=0, *, loop=None)¶ 선입 선출 (FIFO) 큐.
maxsize가 0보다 작거나 같으면 큐 크기는 무한합니다.
0
보다 큰 정수면, 큐가 maxsize에 도달했을 때get()
이 항목을 제거할 때까지await put()
이 블록합니다.표준 라이브러리의 스레드를 쓰는
queue
와는 달리, 큐의 크기는 항상 알려져 있으며qsize()
메서드를 호출하여 얻을 수 있습니다.이 클래스는 스레드 안전하지 않습니다.
-
maxsize
¶ 큐에 허용되는 항목 수.
-
empty
()¶ 큐가 비어 있으면
True
를 반환하고, 그렇지 않으면False
를 반환합니다.
-
coroutine
get
()¶ 큐에서 항목을 제거하고 반환합니다. 큐가 비어 있으면, 항목이 들어올 때까지 기다립니다.
-
get_nowait
()¶ 항목을 즉시 사용할 수 있으면 항목을 반환하고, 그렇지 않으면
QueueEmpty
를 발생시킵니다.
-
coroutine
join
()¶ 큐의 모든 항목을 수신하여 처리할 때까지 블록합니다.
완료되지 않은 작업 수는 항목이 큐에 추가될 때마다 증가합니다. 이 수는 소비자 코루틴이 항목을 수신했고 그 항목에 관한 작업이 모두 완료되었음을 나타내는
task_done()
를 호출할 때마다 감소합니다. 완료되지 않은 작업 수가 0으로 떨어지면join()
가 블록 해제됩니다.
-
coroutine
put
(item)¶ 큐에 항목을 넣습니다. 큐가 가득 차면, 항목을 추가할 빈자리가 생길 때까지 기다립니다.
-
qsize
()¶ 큐에 있는 항목 수를 돌려줍니다.
-
task_done
()¶ 이전에 큐에 넣은 작업이 완료되었음을 나타냅니다.
큐 소비자가 사용합니다. 작업을 꺼내는 데 사용된
get()
마다, 뒤따르는task_done()
호출은 작업에 관한 처리가 완료되었음을 큐에 알려줍니다.join()
이 현재 블록 중이면, 모든 항목이 처리될 때 다시 시작됩니다 (큐에put()
한 모든 항목에 대해task_done()
호출이 수신되었음을 뜻합니다).큐에 넣은 항목보다 더 많이 호출되면
ValueError
를 발생시킵니다.
-
우선순위 큐¶
LIFO 큐¶
예외¶
-
exception
asyncio.
QueueEmpty
¶ 이 예외는
get_nowait()
메서드가 빈 큐에 호출될 때 발생합니다.
-
exception
asyncio.
QueueFull
¶ put_nowait()
메서드가 maxsize에 도달한 큐에 호출될 때 발생하는 예외입니다.
예제¶
큐를 사용하여 여러 동시 태스크로 작업 부하를 분산시킬 수 있습니다:
import asyncio
import random
import time
async def worker(name, queue):
while True:
# Get a "work item" out of the queue.
sleep_for = await queue.get()
# Sleep for the "sleep_for" seconds.
await asyncio.sleep(sleep_for)
# Notify the queue that the "work item" has been processed.
queue.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
async def main():
# Create a queue that we will use to store our "workload".
queue = asyncio.Queue()
# Generate random timings and put them into the queue.
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
# Create three worker tasks to process the queue concurrently.
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
# Wait until the queue is fully processed.
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
# Cancel our worker tasks.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())