Черги

Вихідний код: Lib/asyncio/queues.py


асинхронні черги розроблені таким чином, щоб бути подібними до класів модуля queue. Хоча асинхронні черги не є потокобезпечними, вони розроблені спеціально для використання в асинхронному коді/коді очікування.

Зауважте, що методи асинхронних черг не мають параметра timeout; використовуйте функцію asyncio.wait_for(), щоб виконувати операції в черзі з тайм-аутом.

Дивіться також розділ Examples нижче.

Чергу

class asyncio.Queue(maxsize=0, *, loop=None)

Черга першим прийшов, першим вийшов (FIFO).

Якщо maxsize менше або дорівнює нулю, розмір черги є нескінченним. Якщо це ціле число, більше за 0, тоді await put() блокує, коли черга досягає maxsize, доки елемент не буде видалено get().

На відміну від стандартної потокової обробки бібліотеки queue, розмір черги завжди відомий і може бути повернутий викликом методу qsize().

Deprecated since version 3.8, will be removed in version 3.10: The loop parameter.

Цей клас не потоково безпечний.

maxsize

Кількість елементів, дозволених у черзі.

empty()

Повертає True, якщо черга порожня, False інакше.

full()

Повертає True, якщо в черзі є елементи maxsize.

If the queue was initialized with maxsize=0 (the default), then full() never returns True.

coroutine get()

Видалити та повернути елемент із черги. Якщо черга порожня, зачекайте, поки елемент стане доступним.

get_nowait()

Повернути елемент, якщо він одразу доступний, інакше підняти QueueEmpty.

coroutine join()

Блокуйте, доки всі елементи в черзі не будуть отримані та оброблені.

Кількість незавершених завдань зростає щоразу, коли елемент додається до черги. Підрахунок зменшується щоразу, коли співпрограма споживача викликає task_done(), щоб вказати, що елемент було отримано та вся робота над ним завершена. Коли кількість незавершених завдань падає до нуля, join() розблоковується.

coroutine put(item)

Помістіть товар у чергу. Якщо черга заповнена, зачекайте, поки з’явиться вільне місце, перш ніж додавати елемент.

put_nowait(item)

Поставте елемент у чергу без блокування.

Якщо вільного місця немає, підніміть QueueFull.

qsize()

Повернути кількість елементів у черзі.

task_done()

Вказує на те, що завдання, яке раніше було в черзі, виконано.

Використовується споживачами черги. Для кожного get(), який використовується для отримання завдання, наступний виклик task_done() повідомляє черзі, що обробку завдання завершено.

Якщо join() зараз блокує, воно відновиться, коли всі елементи буде оброблено (це означає, що виклик task_done() отримано для кожного елемента, який був put() в черга).

Викликає ValueError, якщо викликається стільки разів, скільки було елементів, розміщених у черзі.

Пріоритетна черга

class asyncio.PriorityQueue

Варіант Queue; отримує записи в порядку пріоритету (найнижчий спочатку).

Записи зазвичай є кортежами у формі (число_пріоритету, дані).

Черга LIFO

class asyncio.LifoQueue

Варіант Queue, який першими отримує нещодавно додані записи (останнім увійшов, першим вийшов).

Винятки

exception asyncio.QueueEmpty

Цей виняток виникає, коли метод get_nowait() викликається в порожній черзі.

exception asyncio.QueueFull

Виняток виникає, коли метод put_nowait() викликається в черзі, яка досягла максимального розміру.

Приклади

Черги можна використовувати для розподілу робочого навантаження між кількома одночасними завданнями:

import asyncio
import random
import time


async def worker(name, queue):
    while True:
        # Get a "work item" out of the queue.
        sleep_for = await queue.get()

        # Sleep for the "sleep_for" seconds.
        await asyncio.sleep(sleep_for)

        # Notify the queue that the "work item" has been processed.
        queue.task_done()

        print(f'{name} has slept for {sleep_for:.2f} seconds')


async def main():
    # Create a queue that we will use to store our "workload".
    queue = asyncio.Queue()

    # Generate random timings and put them into the queue.
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)

    # Create three worker tasks to process the queue concurrently.
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)

    # Wait until the queue is fully processed.
    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at

    # Cancel our worker tasks.
    for task in tasks:
        task.cancel()
    # Wait until all worker tasks are cancelled.
    await asyncio.gather(*tasks, return_exceptions=True)

    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')


asyncio.run(main())