Черги¶
Вихідний код: Lib/asyncio/queues.py
асинхронні черги розроблені таким чином, щоб бути подібними до класів модуля queue. Хоча асинхронні черги не є потокобезпечними, вони розроблені спеціально для використання в асинхронному коді/коді очікування.
Зауважте, що методи асинхронних черг не мають параметра timeout; використовуйте функцію asyncio.wait_for(), щоб виконувати операції в черзі з тайм-аутом.
Дивіться також розділ Examples нижче.
Чергу¶
-
class
asyncio.Queue(maxsize=0, *, loop=None)¶ Черга першим прийшов, першим вийшов (FIFO).
Якщо maxsize менше або дорівнює нулю, розмір черги є нескінченним. Якщо це ціле число, більше за
0, тодіawait put()блокує, коли черга досягає maxsize, доки елемент не буде видаленоget().На відміну від стандартної потокової обробки бібліотеки
queue, розмір черги завжди відомий і може бути повернутий викликом методуqsize().Deprecated since version 3.8, will be removed in version 3.10: The loop parameter.
Цей клас не потоково безпечний.
-
maxsize¶ Кількість елементів, дозволених у черзі.
-
empty()¶ Повертає
True, якщо черга порожня,Falseінакше.
-
full()¶ Повертає
True, якщо в черзі є елементиmaxsize.If the queue was initialized with
maxsize=0(the default), thenfull()never returnsTrue.
-
coroutine
get()¶ Видалити та повернути елемент із черги. Якщо черга порожня, зачекайте, поки елемент стане доступним.
-
get_nowait()¶ Повернути елемент, якщо він одразу доступний, інакше підняти
QueueEmpty.
-
coroutine
join()¶ Блокуйте, доки всі елементи в черзі не будуть отримані та оброблені.
Кількість незавершених завдань зростає щоразу, коли елемент додається до черги. Підрахунок зменшується щоразу, коли співпрограма споживача викликає
task_done(), щоб вказати, що елемент було отримано та вся робота над ним завершена. Коли кількість незавершених завдань падає до нуля,join()розблоковується.
-
coroutine
put(item)¶ Помістіть товар у чергу. Якщо черга заповнена, зачекайте, поки з’явиться вільне місце, перш ніж додавати елемент.
-
put_nowait(item)¶ Поставте елемент у чергу без блокування.
Якщо вільного місця немає, підніміть
QueueFull.
-
qsize()¶ Повернути кількість елементів у черзі.
-
task_done()¶ Indicate that a formerly enqueued task is complete.
Used by queue consumers. For each
get()used to fetch a task, a subsequent call totask_done()tells the queue that the processing on the task is complete.Якщо
join()зараз блокує, воно відновиться, коли всі елементи буде оброблено (це означає, що викликtask_done()отримано для кожного елемента, який бувput()в черга).Викликає
ValueError, якщо викликається стільки разів, скільки було елементів, розміщених у черзі.
-
Пріоритетна черга¶
Черга LIFO¶
Винятки¶
-
exception
asyncio.QueueEmpty¶ Цей виняток виникає, коли метод
get_nowait()викликається в порожній черзі.
-
exception
asyncio.QueueFull¶ Виняток виникає, коли метод
put_nowait()викликається в черзі, яка досягла максимального розміру.
Приклади¶
Черги можна використовувати для розподілу робочого навантаження між кількома одночасними завданнями:
import asyncio
import random
import time
async def worker(name, queue):
while True:
# Get a "work item" out of the queue.
sleep_for = await queue.get()
# Sleep for the "sleep_for" seconds.
await asyncio.sleep(sleep_for)
# Notify the queue that the "work item" has been processed.
queue.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
async def main():
# Create a queue that we will use to store our "workload".
queue = asyncio.Queue()
# Generate random timings and put them into the queue.
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
# Create three worker tasks to process the queue concurrently.
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
# Wait until the queue is fully processed.
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
# Cancel our worker tasks.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())