Черги¶
Вихідний код: Lib/asyncio/queues.py
асинхронні черги розроблені таким чином, щоб бути подібними до класів модуля queue
. Хоча асинхронні черги не є потокобезпечними, вони розроблені спеціально для використання в асинхронному коді/коді очікування.
Зауважте, що методи асинхронних черг не мають параметра timeout; використовуйте функцію asyncio.wait_for()
, щоб виконувати операції в черзі з тайм-аутом.
Дивіться також розділ Examples нижче.
Чергу¶
-
class
asyncio.
Queue
(maxsize=0, *, loop=None)¶ Черга першим прийшов, першим вийшов (FIFO).
Якщо maxsize менше або дорівнює нулю, розмір черги є нескінченним. Якщо це ціле число, більше за
0
, тодіawait put()
блокує, коли черга досягає maxsize, доки елемент не буде видаленоget()
.На відміну від стандартної потокової обробки бібліотеки
queue
, розмір черги завжди відомий і може бути повернутий викликом методуqsize()
.Deprecated since version 3.8, will be removed in version 3.10: The loop parameter.
Цей клас не потоково безпечний.
-
maxsize
¶ Кількість елементів, дозволених у черзі.
-
empty
()¶ Повертає
True
, якщо черга порожня,False
інакше.
-
full
()¶ Повертає
True
, якщо в черзі є елементиmaxsize
.If the queue was initialized with
maxsize=0
(the default), thenfull()
never returnsTrue
.
-
coroutine
get
()¶ Видалити та повернути елемент із черги. Якщо черга порожня, зачекайте, поки елемент стане доступним.
-
get_nowait
()¶ Повернути елемент, якщо він одразу доступний, інакше підняти
QueueEmpty
.
-
coroutine
join
()¶ Блокуйте, доки всі елементи в черзі не будуть отримані та оброблені.
Кількість незавершених завдань зростає щоразу, коли елемент додається до черги. Підрахунок зменшується щоразу, коли співпрограма споживача викликає
task_done()
, щоб вказати, що елемент було отримано та вся робота над ним завершена. Коли кількість незавершених завдань падає до нуля,join()
розблоковується.
-
coroutine
put
(item)¶ Помістіть товар у чергу. Якщо черга заповнена, зачекайте, поки з’явиться вільне місце, перш ніж додавати елемент.
-
put_nowait
(item)¶ Поставте елемент у чергу без блокування.
Якщо вільного місця немає, підніміть
QueueFull
.
-
qsize
()¶ Повернути кількість елементів у черзі.
-
task_done
()¶ Вказує на те, що завдання, яке раніше було в черзі, виконано.
Використовується споживачами черги. Для кожного
get()
, який використовується для отримання завдання, наступний викликtask_done()
повідомляє черзі, що обробку завдання завершено.Якщо
join()
зараз блокує, воно відновиться, коли всі елементи буде оброблено (це означає, що викликtask_done()
отримано для кожного елемента, який бувput()
в черга).Викликає
ValueError
, якщо викликається стільки разів, скільки було елементів, розміщених у черзі.
-
Пріоритетна черга¶
Черга LIFO¶
Винятки¶
-
exception
asyncio.
QueueEmpty
¶ Цей виняток виникає, коли метод
get_nowait()
викликається в порожній черзі.
-
exception
asyncio.
QueueFull
¶ Виняток виникає, коли метод
put_nowait()
викликається в черзі, яка досягла максимального розміру.
Приклади¶
Черги можна використовувати для розподілу робочого навантаження між кількома одночасними завданнями:
import asyncio
import random
import time
async def worker(name, queue):
while True:
# Get a "work item" out of the queue.
sleep_for = await queue.get()
# Sleep for the "sleep_for" seconds.
await asyncio.sleep(sleep_for)
# Notify the queue that the "work item" has been processed.
queue.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
async def main():
# Create a queue that we will use to store our "workload".
queue = asyncio.Queue()
# Generate random timings and put them into the queue.
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
# Create three worker tasks to process the queue concurrently.
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
# Wait until the queue is fully processed.
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
# Cancel our worker tasks.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())