queue — A synchronized queue class

Вихідний код: Lib/queue.py


Модуль queue реалізує черги з кількома виробниками та кількома споживачами. Це особливо корисно в потоковому програмуванні, коли необхідно безпечно обмінюватися інформацією між декількома потоками. Клас Queue у цьому модулі реалізує всю необхідну семантику блокування.

The module implements three types of queue, which differ only in the order in which the entries are retrieved. In a FIFO queue, the first tasks added are the first retrieved. In a LIFO queue, the most recently added entry is the first retrieved (operating like a stack). With a priority queue, the entries are kept sorted (using the heapq module) and the lowest valued entry is retrieved first.

Внутрішньо ці три типи черг використовують блокування для тимчасового блокування конкуруючих потоків; однак вони не призначені для обробки повторного входу в потік.

Крім того, модуль реалізує «простий» тип черги FIFO, SimpleQueue, конкретна реалізація якого забезпечує додаткові гарантії в обмін на меншу функціональність.

Модуль queue визначає такі класи та винятки:

class queue.Queue(maxsize=0)

Конструктор для черги FIFO. maxsize — це ціле число, яке встановлює верхню межу кількості елементів, які можна розмістити в черзі. Вставлення буде заблоковано, коли буде досягнуто цього розміру, доки елементи черги не будуть використані. Якщо maxsize менше або дорівнює нулю, розмір черги є нескінченним.

class queue.LifoQueue(maxsize=0)

Конструктор для LIFO черги. maxsize — це ціле число, яке встановлює верхню межу кількості елементів, які можна розмістити в черзі. Вставлення буде заблоковано після досягнення цього розміру, доки елементи черги не будуть використані. Якщо maxsize менше або дорівнює нулю, розмір черги є нескінченним.

class queue.PriorityQueue(maxsize=0)

Конструктор для пріоритетної черги. maxsize — це ціле число, яке встановлює верхню межу кількості елементів, які можна розмістити в черзі. Вставлення буде заблоковано після досягнення цього розміру, доки елементи черги не будуть використані. Якщо maxsize менше або дорівнює нулю, розмір черги є нескінченним.

The lowest valued entries are retrieved first (the lowest valued entry is the one that would be returned by min(entries)). A typical pattern for entries is a tuple in the form: (priority_number, data).

Якщо елементи data не можна порівняти, дані можна загорнути в клас, який ігнорує елемент даних і порівнює лише номер пріоритету:

from dataclasses import dataclass, field
from typing import Any

@dataclass(order=True)
class PrioritizedItem:
    priority: int
    item: Any=field(compare=False)
class queue.SimpleQueue

Конструктор для необмеженої черги FIFO. Прості черги не мають розширених функцій, таких як відстеження завдань.

Added in version 3.7.

exception queue.Empty

Виняток виникає, коли неблокуючий get() (або get_nowait()) викликається для порожнього об’єкта Queue.

exception queue.Full

Виняток виникає, коли неблокуючий put() (або put_nowait()) викликається для заповненого об’єкта Queue.

exception queue.ShutDown

Exception raised when put() or get() is called on a Queue object which has been shut down.

Added in version 3.13.

Об’єкти черги

Об’єкти черги (Queue, LifoQueue або PriorityQueue) надають публічні методи, описані нижче.

Queue.qsize()

Повертає приблизний розмір черги. Зауважте, що qsize() > 0 не гарантує, що наступний get() не заблокує, а також qsize() < maxsize не гарантує, що put() не заблокує.

Queue.empty()

Повертає True, якщо черга порожня, False інакше. Якщо empty() повертає True, це не гарантує, що наступний виклик put() не заблокується. Подібним чином, якщо empty() повертає False, це не гарантує, що наступний виклик get() не заблокується.

Queue.full()

Повертає True, якщо черга заповнена, False інакше. Якщо full() повертає True, це не гарантує, що наступний виклик get() не заблокується. Так само, якщо full() повертає False, це не гарантує, що наступний виклик put() не буде заблоковано.

Queue.put(item, block=True, timeout=None)

Put item into the queue. If optional args block is true and timeout is None (the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Full exception if no free slot was available within that time. Otherwise (block is false), put an item on the queue if a free slot is immediately available, else raise the Full exception (timeout is ignored in that case).

Raises ShutDown if the queue has been shut down.

Queue.put_nowait(item)

Еквівалент put(item, block=False).

Queue.get(block=True, timeout=None)

Видалити та повернути елемент із черги. Якщо необов’язкові аргументи block мають значення true, а timeout має значення None (за замовчуванням), за потреби блокуйте, доки елемент не стане доступним. Якщо timeout є позитивним числом, він блокує щонайбільше timeout секунд і викликає виключення Empty, якщо жоден елемент не був доступний протягом цього часу. В іншому випадку (block — false), повертає елемент, якщо він одразу доступний, інакше викликає виняток Empty (у цьому випадку timeout ігнорується).

Prior to 3.0 on POSIX systems, and for all versions on Windows, if block is true and timeout is None, this operation goes into an uninterruptible wait on an underlying lock. This means that no exceptions can occur, and in particular a SIGINT will not trigger a KeyboardInterrupt.

Raises ShutDown if the queue has been shut down and is empty, or if the queue has been shut down immediately.

Queue.get_nowait()

Еквівалент get(False).

Пропонуються два методи підтримки відстеження того, чи завдання, поставлені в чергу, були повністю оброблені потоками споживачів демона.

Queue.task_done()

Вказує на те, що завдання, яке раніше було в черзі, виконано. Використовується потоками споживачів черги. Для кожного get(), який використовується для отримання завдання, наступний виклик task_done() повідомляє черзі, що обробку завдання завершено.

Якщо join() зараз блокується, воно відновиться, коли всі елементи буде оброблено (це означає, що виклик task_done() отримано для кожного елемента, який був put() у чергу) .

shutdown(immediate=True) calls task_done() for each remaining item in the queue.

Викликає ValueError, якщо викликається стільки разів, скільки було елементів, розміщених у черзі.

Queue.join()

Блокує, доки всі елементи в черзі не будуть отримані та оброблені.

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer thread calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.

Приклад очікування виконання завдань, поставлених у чергу:

import threading
import queue

q = queue.Queue()

def worker():
    while True:
        item = q.get()
        print(f'Working on {item}')
        print(f'Finished {item}')
        q.task_done()

# Turn-on the worker thread.
threading.Thread(target=worker, daemon=True).start()

# Send thirty task requests to the worker.
for item in range(30):
    q.put(item)

# Block until all tasks are done.
q.join()
print('All work completed')

Terminating queues

Queue objects can be made to prevent further interaction by shutting them down.

Queue.shutdown(immediate=False)

Shut down the queue, making get() and put() raise ShutDown.

By default, get() on a shut down queue will only raise once the queue is empty. Set immediate to true to make get() raise immediately instead.

All blocked callers of put() and get() will be unblocked. If immediate is true, a task will be marked as done for each remaining item in the queue, which may unblock callers of join().

Added in version 3.13.

Об’єкти SimpleQueue

Об’єкти SimpleQueue забезпечують публічні методи, описані нижче.

SimpleQueue.qsize()

Повертає приблизний розмір черги. Зауважте, що qsize() > 0 не гарантує, що наступний get() не заблокує.

SimpleQueue.empty()

Return True if the queue is empty, False otherwise. If empty() returns False it doesn’t guarantee that a subsequent call to get() will not block.

SimpleQueue.put(item, block=True, timeout=None)

Поставте товар в чергу. Метод ніколи не блокується і завжди виконується успішно (за винятком потенційних низькорівневих помилок, таких як помилка виділення пам’яті). Необов’язкові аргументи block і timeout ігноруються та надаються лише для сумісності з Queue.put().

Деталі реалізації CPython: This method has a C implementation which is reentrant. That is, a put() or get() call can be interrupted by another put() call in the same thread without deadlocking or corrupting internal state inside the queue. This makes it appropriate for use in destructors such as __del__ methods or weakref callbacks.

SimpleQueue.put_nowait(item)

Еквівалент put(item, block=False), передбачений для сумісності з Queue.put_nowait().

SimpleQueue.get(block=True, timeout=None)

Видалити та повернути елемент із черги. Якщо необов’язкові аргументи block мають значення true, а timeout має значення None (за замовчуванням), за потреби блокуйте, доки елемент не стане доступним. Якщо timeout є позитивним числом, він блокує щонайбільше timeout секунд і викликає виключення Empty, якщо жоден елемент не був доступний протягом цього часу. В іншому випадку (block — false), повертає елемент, якщо він одразу доступний, інакше викликає виняток Empty (у цьому випадку timeout ігнорується).

SimpleQueue.get_nowait()

Еквівалент get(False).

Дивись також

Клас multiprocessing.Queue

Клас черги для використання в багатопроцесорному (а не багатопоточному) контексті.

collections.deque — це альтернативна реалізація необмежених черг із швидкими атомарними операціями append() і popleft(), які не потребують блокування, а також підтримують індексація.