Черги

Вихідний код: Lib/asyncio/queues.py


асинхронні черги розроблені таким чином, щоб бути подібними до класів модуля queue. Хоча асинхронні черги не є потокобезпечними, вони розроблені спеціально для використання в асинхронному коді/коді очікування.

Зауважте, що методи асинхронних черг не мають параметра timeout; використовуйте функцію asyncio.wait_for(), щоб виконувати операції в черзі з тайм-аутом.

Дивіться також розділ Examples нижче.

Чергу

class asyncio.Queue(maxsize=0)

Черга першим прийшов, першим вийшов (FIFO).

Якщо maxsize менше або дорівнює нулю, розмір черги є нескінченним. Якщо це ціле число, більше за 0, тоді await put() блокує, коли черга досягає maxsize, доки елемент не буде видалено get().

На відміну від стандартної потокової обробки бібліотеки queue, розмір черги завжди відомий і може бути повернутий викликом методу qsize().

Змінено в версії 3.10: Видалено параметр loop.

Цей клас не потоково безпечний.

maxsize

Кількість елементів, дозволених у черзі.

empty()

Повертає True, якщо черга порожня, False інакше.

full()

Повертає True, якщо в черзі є елементи maxsize.

If the queue was initialized with maxsize=0 (the default), then full() never returns True.

coroutine get()

Видалити та повернути елемент із черги. Якщо черга порожня, зачекайте, поки елемент стане доступним.

Raises QueueShutDown if the queue has been shut down and is empty, or if the queue has been shut down immediately.

get_nowait()

Повернути елемент, якщо він одразу доступний, інакше підняти QueueEmpty.

coroutine join()

Блокуйте, доки всі елементи в черзі не будуть отримані та оброблені.

Кількість незавершених завдань зростає щоразу, коли елемент додається до черги. Підрахунок зменшується щоразу, коли співпрограма споживача викликає task_done(), щоб вказати, що елемент було отримано та вся робота над ним завершена. Коли кількість незавершених завдань падає до нуля, join() розблоковується.

coroutine put(item)

Помістіть товар у чергу. Якщо черга заповнена, зачекайте, поки з’явиться вільне місце, перш ніж додавати елемент.

Raises QueueShutDown if the queue has been shut down.

put_nowait(item)

Поставте елемент у чергу без блокування.

Якщо вільного місця немає, підніміть QueueFull.

qsize()

Повернути кількість елементів у черзі.

shutdown(immediate=False)

Shut down the queue, making get() and put() raise QueueShutDown.

By default, get() on a shut down queue will only raise once the queue is empty. Set immediate to true to make get() raise immediately instead.

All blocked callers of put() and get() will be unblocked. If immediate is true, a task will be marked as done for each remaining item in the queue, which may unblock callers of join().

Added in version 3.13.

task_done()

Вказує на те, що завдання, яке раніше було в черзі, виконано.

Використовується споживачами черги. Для кожного get(), який використовується для отримання завдання, наступний виклик task_done() повідомляє черзі, що обробку завдання завершено.

Якщо join() зараз блокує, воно відновиться, коли всі елементи буде оброблено (це означає, що виклик task_done() отримано для кожного елемента, який був put() в черга).

shutdown(immediate=True) calls task_done() for each remaining item in the queue.

Викликає ValueError, якщо викликається стільки разів, скільки було елементів, розміщених у черзі.

Пріоритетна черга

class asyncio.PriorityQueue

Варіант Queue; отримує записи в порядку пріоритету (найнижчий спочатку).

Записи зазвичай є кортежами у формі (число_пріоритету, дані).

Черга LIFO

class asyncio.LifoQueue

Варіант Queue, який першими отримує нещодавно додані записи (останнім увійшов, першим вийшов).

Винятки

exception asyncio.QueueEmpty

Цей виняток виникає, коли метод get_nowait() викликається в порожній черзі.

exception asyncio.QueueFull

Виняток виникає, коли метод put_nowait() викликається в черзі, яка досягла максимального розміру.

exception asyncio.QueueShutDown

Exception raised when put() or get() is called on a queue which has been shut down.

Added in version 3.13.

Приклади

Черги можна використовувати для розподілу робочого навантаження між кількома одночасними завданнями:

import asyncio
import random
import time


async def worker(name, queue):
    while True:
        # Get a "work item" out of the queue.
        sleep_for = await queue.get()

        # Sleep for the "sleep_for" seconds.
        await asyncio.sleep(sleep_for)

        # Notify the queue that the "work item" has been processed.
        queue.task_done()

        print(f'{name} has slept for {sleep_for:.2f} seconds')


async def main():
    # Create a queue that we will use to store our "workload".
    queue = asyncio.Queue()

    # Generate random timings and put them into the queue.
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)

    # Create three worker tasks to process the queue concurrently.
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)

    # Wait until the queue is fully processed.
    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at

    # Cancel our worker tasks.
    for task in tasks:
        task.cancel()
    # Wait until all worker tasks are cancelled.
    await asyncio.gather(*tasks, return_exceptions=True)

    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')


asyncio.run(main())