큐¶
소스 코드: Lib/asyncio/queues.py
asyncio 큐는 queue 모듈의 클래스와 유사하도록 설계되었습니다. asyncio 큐는 스레드 안전하지 않지만, async/await 코드에서 사용되도록 설계되었습니다.
asyncio 큐의 메서드에는 timeout 매개 변수가 없습니다; 시간제한이 있는 큐 연산을 하려면 asyncio.wait_for() 함수를 사용하십시오.
아래의 예제 절도 참조하십시오.
Queue¶
- class asyncio.Queue(maxsize=0)¶
- 선입 선출 (FIFO) 큐. - maxsize가 0보다 작거나 같으면 큐 크기는 무한합니다. - 0보다 큰 정수면, 큐가 maxsize에 도달했을 때- get()이 항목을 제거할 때까지- await put()이 블록합니다.- 표준 라이브러리의 스레드를 쓰는 - queue와는 달리, 큐의 크기는 항상 알려져 있으며- qsize()메서드를 호출하여 얻을 수 있습니다.- 버전 3.10에서 변경: loop 매개 변수를 제거했습니다. - 이 클래스는 스레드 안전하지 않습니다. - maxsize¶
- 큐에 허용되는 항목 수. 
 - empty()¶
- 큐가 비어 있으면 - True를 반환하고, 그렇지 않으면- False를 반환합니다.
 - async get()¶
- 큐에서 항목을 제거하고 반환합니다. 큐가 비어 있으면, 항목이 들어올 때까지 기다립니다. - Raises - QueueShutDownif the queue has been shut down and is empty, or if the queue has been shut down immediately.
 - get_nowait()¶
- 항목을 즉시 사용할 수 있으면 항목을 반환하고, 그렇지 않으면 - QueueEmpty를 발생시킵니다.
 - async join()¶
- 큐의 모든 항목을 수신하여 처리할 때까지 블록합니다. - 완료되지 않은 작업 수는 항목이 큐에 추가될 때마다 증가합니다. 이 수는 소비자 코루틴이 항목을 수신했고 그 항목에 관한 작업이 모두 완료되었음을 나타내는 - task_done()를 호출할 때마다 감소합니다. 완료되지 않은 작업 수가 0으로 떨어지면- join()가 블록 해제됩니다.
 - async put(item)¶
- 큐에 항목을 넣습니다. 큐가 가득 차면, 항목을 추가할 빈자리가 생길 때까지 기다립니다. - Raises - QueueShutDownif the queue has been shut down.
 - qsize()¶
- 큐에 있는 항목 수를 돌려줍니다. 
 - shutdown(immediate=False)¶
- Put a - Queueinstance into a shutdown mode.- The queue can no longer grow. Future calls to - put()raise- QueueShutDown. Currently blocked callers of- put()will be unblocked and will raise- QueueShutDownin the formerly blocked thread.- If immediate is false (the default), the queue can be wound down normally with - get()calls to extract tasks that have already been loaded.- And if - task_done()is called for each remaining task, a pending- join()will be unblocked normally.- Once the queue is empty, future calls to - get()will raise- QueueShutDown.- If immediate is true, the queue is terminated immediately. The queue is drained to be completely empty. All callers of - join()are unblocked regardless of the number of unfinished tasks. Blocked callers of- get()are unblocked and will raise- QueueShutDownbecause the queue is empty.- Use caution when using - join()with immediate set to true. This unblocks the join even when no work has been done on the tasks, violating the usual invariant for joining a queue.- Added in version 3.13. 
 - task_done()¶
- 이전에 큐에 넣은 작업 항목이 완료되었음을 나타냅니다. - 큐 소비자가 사용합니다. 작업 항목을 꺼내는 데 사용된 - get()마다, 뒤따르는- task_done()호출은 작업 항목에 관한 처리가 완료되었음을 큐에 알려줍니다.- join()이 현재 블록 중이면, 모든 항목이 처리될 때 다시 시작됩니다 (큐에- put()한 모든 항목에 대해- task_done()호출이 수신되었음을 뜻합니다).- 큐에 넣은 항목보다 더 많이 호출되면 - ValueError를 발생시킵니다.
 
우선순위 큐¶
LIFO 큐¶
예외¶
- exception asyncio.QueueEmpty¶
- 이 예외는 - get_nowait()메서드가 빈 큐에 호출될 때 발생합니다.
- exception asyncio.QueueFull¶
- put_nowait()메서드가 maxsize에 도달한 큐에 호출될 때 발생하는 예외입니다.
예제¶
큐를 사용하여 여러 동시 태스크로 작업 부하를 분산시킬 수 있습니다:
import asyncio
import random
import time
async def worker(name, queue):
    while True:
        # 큐에서 "작업 항목"을 가져옵니다.
        sleep_for = await queue.get()
        # "sleep_for" 초 동안 잡니다.
        await asyncio.sleep(sleep_for)
        # 큐에 "작업 항목"이 처리되었음을 알립니다.
        queue.task_done()
        print(f'{name} has slept for {sleep_for:.2f} seconds')
async def main():
    # "작업 부하"를 저장하는 데 사용할 큐를 만듭니다.
    queue = asyncio.Queue()
    # 무작위 대기 시간을 만들어서 큐에 넣습니다.
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)
    # 큐를 동시에 처리할 세 개의 worker 태스크를 만듭니다.
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)
    # 큐가 완전히 처리될 때까지 기다립니다.
    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at
    # worker 태스크를 취소합니다.
    for task in tasks:
        task.cancel()
    # 모든 worker 태스크가 취소될 때까지 기다립니다.
    await asyncio.gather(*tasks, return_exceptions=True)
    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())