Черги¶
Вихідний код: Lib/asyncio/queues.py
асинхронні черги розроблені таким чином, щоб бути подібними до класів модуля queue
. Хоча асинхронні черги не є потокобезпечними, вони розроблені спеціально для використання в асинхронному коді/коді очікування.
Зауважте, що методи асинхронних черг не мають параметра timeout; використовуйте функцію asyncio.wait_for()
, щоб виконувати операції в черзі з тайм-аутом.
Дивіться також розділ Examples нижче.
Чергу¶
- class asyncio.Queue(maxsize=0)¶
Черга першим прийшов, першим вийшов (FIFO).
Якщо maxsize менше або дорівнює нулю, розмір черги є нескінченним. Якщо це ціле число, більше за
0
, тодіawait put()
блокує, коли черга досягає maxsize, доки елемент не буде видаленоget()
.На відміну від стандартної потокової обробки бібліотеки
queue
, розмір черги завжди відомий і може бути повернутий викликом методуqsize()
.Змінено в версії 3.10: Видалено параметр loop.
Цей клас не потоково безпечний.
- maxsize¶
Кількість елементів, дозволених у черзі.
- empty()¶
Повертає
True
, якщо черга порожня,False
інакше.
- full()¶
Повертає
True
, якщо в черзі є елементиmaxsize
.If the queue was initialized with
maxsize=0
(the default), thenfull()
never returnsTrue
.
- coroutine get()¶
Видалити та повернути елемент із черги. Якщо черга порожня, зачекайте, поки елемент стане доступним.
Raises
QueueShutDown
if the queue has been shut down and is empty, or if the queue has been shut down immediately.
- get_nowait()¶
Повернути елемент, якщо він одразу доступний, інакше підняти
QueueEmpty
.
- coroutine join()¶
Блокуйте, доки всі елементи в черзі не будуть отримані та оброблені.
Кількість незавершених завдань зростає щоразу, коли елемент додається до черги. Підрахунок зменшується щоразу, коли співпрограма споживача викликає
task_done()
, щоб вказати, що елемент було отримано та вся робота над ним завершена. Коли кількість незавершених завдань падає до нуля,join()
розблоковується.
- coroutine put(item)¶
Помістіть товар у чергу. Якщо черга заповнена, зачекайте, поки з’явиться вільне місце, перш ніж додавати елемент.
Raises
QueueShutDown
if the queue has been shut down.
- put_nowait(item)¶
Поставте елемент у чергу без блокування.
Якщо вільного місця немає, підніміть
QueueFull
.
- qsize()¶
Повернути кількість елементів у черзі.
- shutdown(immediate=False)¶
Shut down the queue, making
get()
andput()
raiseQueueShutDown
.By default,
get()
on a shut down queue will only raise once the queue is empty. Set immediate to true to makeget()
raise immediately instead.All blocked callers of
put()
andget()
will be unblocked. If immediate is true, a task will be marked as done for each remaining item in the queue, which may unblock callers ofjoin()
.Added in version 3.13.
- task_done()¶
Вказує на те, що завдання, яке раніше було в черзі, виконано.
Використовується споживачами черги. Для кожного
get()
, який використовується для отримання завдання, наступний викликtask_done()
повідомляє черзі, що обробку завдання завершено.Якщо
join()
зараз блокує, воно відновиться, коли всі елементи буде оброблено (це означає, що викликtask_done()
отримано для кожного елемента, який бувput()
в черга).shutdown(immediate=True)
callstask_done()
for each remaining item in the queue.Викликає
ValueError
, якщо викликається стільки разів, скільки було елементів, розміщених у черзі.
Пріоритетна черга¶
Черга LIFO¶
Винятки¶
- exception asyncio.QueueEmpty¶
Цей виняток виникає, коли метод
get_nowait()
викликається в порожній черзі.
- exception asyncio.QueueFull¶
Виняток виникає, коли метод
put_nowait()
викликається в черзі, яка досягла максимального розміру.
Приклади¶
Черги можна використовувати для розподілу робочого навантаження між кількома одночасними завданнями:
import asyncio
import random
import time
async def worker(name, queue):
while True:
# Get a "work item" out of the queue.
sleep_for = await queue.get()
# Sleep for the "sleep_for" seconds.
await asyncio.sleep(sleep_for)
# Notify the queue that the "work item" has been processed.
queue.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
async def main():
# Create a queue that we will use to store our "workload".
queue = asyncio.Queue()
# Generate random timings and put them into the queue.
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
# Create three worker tasks to process the queue concurrently.
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
# Wait until the queue is fully processed.
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
# Cancel our worker tasks.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())