Черги

Вихідний код: Lib/asyncio/queues.py


асинхронні черги розроблені таким чином, щоб бути подібними до класів модуля queue. Хоча асинхронні черги не є потокобезпечними, вони розроблені спеціально для використання в асинхронному коді/коді очікування.

Зауважте, що методи асинхронних черг не мають параметра timeout; використовуйте функцію asyncio.wait_for(), щоб виконувати операції в черзі з тайм-аутом.

Дивіться також розділ Examples нижче.

Чергу

class asyncio.Queue(maxsize=0)

Черга першим прийшов, першим вийшов (FIFO).

Якщо maxsize менше або дорівнює нулю, розмір черги є нескінченним. Якщо це ціле число, більше за 0, тоді await put() блокує, коли черга досягає maxsize, доки елемент не буде видалено get().

На відміну від стандартної потокової обробки бібліотеки queue, розмір черги завжди відомий і може бути повернутий викликом методу qsize().

Змінено в версії 3.10: Видалено параметр loop.

Цей клас не потоково безпечний.

maxsize

Кількість елементів, дозволених у черзі.

empty()

Повертає True, якщо черга порожня, False інакше.

full()

Повертає True, якщо в черзі є елементи maxsize.

If the queue was initialized with maxsize=0 (the default), then full() never returns True.

async get()

Видалити та повернути елемент із черги. Якщо черга порожня, зачекайте, поки елемент стане доступним.

Raises QueueShutDown if the queue has been shut down and is empty, or if the queue has been shut down immediately.

get_nowait()

Повернути елемент, якщо він одразу доступний, інакше підняти QueueEmpty.

async join()

Блокуйте, доки всі елементи в черзі не будуть отримані та оброблені.

Кількість незавершених завдань зростає щоразу, коли елемент додається до черги. Підрахунок зменшується щоразу, коли співпрограма споживача викликає task_done(), щоб вказати, що елемент було отримано та вся робота над ним завершена. Коли кількість незавершених завдань падає до нуля, join() розблоковується.

async put(item)

Помістіть товар у чергу. Якщо черга заповнена, зачекайте, поки з’явиться вільне місце, перш ніж додавати елемент.

Raises QueueShutDown if the queue has been shut down.

put_nowait(item)

Поставте елемент у чергу без блокування.

Якщо вільного місця немає, підніміть QueueFull.

qsize()

Повернути кількість елементів у черзі.

shutdown(immediate=False)

Put a Queue instance into a shutdown mode.

The queue can no longer grow. Future calls to put() raise QueueShutDown. Currently blocked callers of put() will be unblocked and will raise QueueShutDown in the formerly blocked thread.

If immediate is false (the default), the queue can be wound down normally with get() calls to extract tasks that have already been loaded.

And if task_done() is called for each remaining task, a pending join() will be unblocked normally.

Once the queue is empty, future calls to get() will raise QueueShutDown.

If immediate is true, the queue is terminated immediately. The queue is drained to be completely empty and the count of unfinished tasks is reduced by the number of tasks drained. If unfinished tasks is zero, callers of join() are unblocked. Also, blocked callers of get() are unblocked and will raise QueueShutDown because the queue is empty.

Use caution when using join() with immediate set to true. This unblocks the join even when no work has been done on the tasks, violating the usual invariant for joining a queue.

Added in version 3.13.

task_done()

Indicate that a formerly enqueued work item is complete.

Used by queue consumers. For each get() used to fetch a work item, a subsequent call to task_done() tells the queue that the processing on the work item is complete.

Якщо join() зараз блокує, воно відновиться, коли всі елементи буде оброблено (це означає, що виклик task_done() отримано для кожного елемента, який був put() в черга).

Викликає ValueError, якщо викликається стільки разів, скільки було елементів, розміщених у черзі.

Пріоритетна черга

class asyncio.PriorityQueue

Варіант Queue; отримує записи в порядку пріоритету (найнижчий спочатку).

Записи зазвичай є кортежами у формі (число_пріоритету, дані).

Черга LIFO

class asyncio.LifoQueue

Варіант Queue, який першими отримує нещодавно додані записи (останнім увійшов, першим вийшов).

Винятки

exception asyncio.QueueEmpty

Цей виняток виникає, коли метод get_nowait() викликається в порожній черзі.

exception asyncio.QueueFull

Виняток виникає, коли метод put_nowait() викликається в черзі, яка досягла максимального розміру.

exception asyncio.QueueShutDown

Exception raised when put() or get() is called on a queue which has been shut down.

Added in version 3.13.

Приклади

Черги можна використовувати для розподілу робочого навантаження між кількома одночасними завданнями:

import asyncio
import random
import time


async def worker(name, queue):
    while True:
        # Get a "work item" out of the queue.
        sleep_for = await queue.get()

        # Sleep for the "sleep_for" seconds.
        await asyncio.sleep(sleep_for)

        # Notify the queue that the "work item" has been processed.
        queue.task_done()

        print(f'{name} has slept for {sleep_for:.2f} seconds')


async def main():
    # Create a queue that we will use to store our "workload".
    queue = asyncio.Queue()

    # Generate random timings and put them into the queue.
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)

    # Create three worker tasks to process the queue concurrently.
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)

    # Wait until the queue is fully processed.
    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at

    # Cancel our worker tasks.
    for task in tasks:
        task.cancel()
    # Wait until all worker tasks are cancelled.
    await asyncio.gather(*tasks, return_exceptions=True)

    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')


asyncio.run(main())